syncer.py 18.1 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16
17
18
import time
import threading
import logging
19
from collections import defaultdict
20
21
22
23
24
25

from agkyra.syncer import common
from agkyra.syncer.setup import SyncerSettings
from agkyra.syncer.database import transaction
from agkyra.syncer.localfs_client import LocalfsFileClient
from agkyra.syncer.pithos_client import PithosFileClient
26
from agkyra.syncer import messaging, utils
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

logger = logging.getLogger(__name__)


class FileSyncer(object):

    dbname = None
    clients = None

    def __init__(self, settings, master, slave):
        self.settings = settings
        self.master = master
        self.slave = slave
        self.DECISION = 'DECISION'
        self.SYNC = 'SYNC'
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
42
43
        self.MASTER = master.SIGNATURE
        self.SLAVE = slave.SIGNATURE
44
45
        self.get_db = settings.get_db
        self.clients = {self.MASTER: master, self.SLAVE: slave}
46
47
48
        self.notifiers = {}
        self.decide_thread = None
        self.sync_threads = []
49
        self.failed_serials = utils.ThreadSafeDict()
50
        self.messager = settings.messager
51
        self.heartbeat = self.settings.heartbeat
52

53
54
55
56
57
58
59
    def thread_is_active(self, t):
        return t and t.is_alive()

    @property
    def decide_active(self):
        return self.thread_is_active(self.decide_thread)

60
61
    @property
    def paused(self):
62
        return not self.decide_active
63

64
    def initiate_probe(self):
65
        self.start_notifiers()
66
        self.probe_all(forced=True)
67
68

    def start_notifiers(self):
69
70
71
        for signature, client in self.clients.iteritems():
            notifier = self.notifiers.get(signature)
            if not self.thread_is_active(notifier):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
72
                self.notifiers[signature] = client.notifier()
73
            else:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
74
                logger.info("Notifier %s already up" % signature)
75
76
77
78

    def stop_notifiers(self):
        for notifier in self.notifiers.values():
            notifier.stop()
79
80

    def start_decide(self):
81
82
83
84
85
86
        if not self.decide_active:
            self.decide_thread = self._poll_decide()

    def stop_decide(self):
        if self.decide_active:
            self.decide_thread.stop()
87

88
89
90
91
92
93
94
    def stop_all_daemons(self):
        self.stop_decide()
        self.stop_notifiers()

    def wait_sync_threads(self):
        for thread in self.sync_threads:
            thread.join()
95

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
96
97
    def get_next_message(self, block=False):
        return self.messager.get(block=block)
98

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
99
    def probe_file(self, archive, objname):
100
        ident = utils.time_stamp()
101
102
103
104
105
106
        try:
            self._probe_files(archive, [objname], ident)
            client = self.clients[archive]
            client.remove_candidates([objname], ident)
        except common.DatabaseError:
            pass
107

108
109
110
111
112
113
    @transaction()
    def _probe_files(self, archive, objnames, ident):
        for objname in objnames:
            self._do_probe_file(archive, objname, ident)

    def _do_probe_file(self, archive, objname, ident):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
114
        logger.info("Probing archive: %s, object: '%s'" % (archive, objname))
115
116
        db = self.get_db()
        client = self.clients[archive]
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
117
118
        db_state = db.get_state(archive, objname)
        ref_state = db.get_state(self.SYNC, objname)
119
        with self.heartbeat.lock() as hb:
120
            beat = hb.get(utils.reg_name(objname))
121
122
            if beat is not None:
                if utils.younger_than(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
123
                        beat["tstamp"], self.settings.action_max_wait):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
124
125
126
127
                    msg = messaging.HeartbeatNoProbeMessage(
                        archive=archive, objname=objname, heartbeat=beat,
                        logger=logger)
                    self.messager.put(msg)
128
                    return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
129
        if db_state.serial != ref_state.serial:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
130
            msg = messaging.AlreadyProbedMessage(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
131
132
                archive=archive, objname=objname, serial=db_state.serial,
                logger=logger)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
133
            self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
134
            return
135
        live_state = client.probe_file(objname, db_state, ref_state, ident)
136
137
        if live_state is not None:
            self.update_file_state(live_state)
138

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
139
    def update_file_state(self, live_state):
140
141
        db = self.get_db()
        archive = live_state.archive
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
142
        objname = live_state.objname
143
        serial = live_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
144
        db_state = db.get_state(archive, objname)
145
146
        if db_state and db_state.serial != serial:
            logger.warning(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
147
                "Cannot update archive: %s, object: '%s', "
148
                "serial: %s, db_serial: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
149
                (archive, objname, serial, db_state.serial))
150
151
            return

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
152
        new_serial = db.new_serial(objname)
153
154
        new_state = live_state.set(serial=new_serial)
        db.put_state(new_state)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
155
156
157
158
        msg = messaging.UpdateMessage(
            archive=archive, objname=objname,
            serial=new_serial, old_serial=serial, logger=logger)
        self.messager.put(msg)
159
160
        if new_serial == 0:
            sync_state = common.FileState(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
161
                archive=self.SYNC, objname=objname, serial=-1,
162
163
164
                info={})
            db.put_state(sync_state)

165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
    @transaction()
    def dry_run_decisions(self, objnames, master=None, slave=None):
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
        decisions = []
        for objname in objnames:
            decision = self._dry_run_decision(objname, master, slave)
            decisions.append(decision)
        return decisions

    def _dry_run_decision(self, objname, master=None, slave=None):
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
        ident = utils.time_stamp()
        return self._do_decide_file_sync(objname, master, slave, ident, True)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
185
    def decide_file_sync(self, objname, master=None, slave=None):
186
187
188
189
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
190
        ident = utils.time_stamp()
191
192
193
194
        try:
            states = self._decide_file_sync(objname, master, slave, ident)
        except common.DatabaseError:
            return
195
196
        if states is None:
            return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
197
        self.sync_file(*states)
198
199

    @transaction()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
200
    def _decide_file_sync(self, objname, master, slave, ident):
201
202
203
204
        db = self.get_db()
        if not self.settings._sync_is_enabled(db):
            logger.warning("Cannot decide '%s'; sync disabled." % objname)
            return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
205
        states = self._do_decide_file_sync(objname, master, slave, ident)
206
207
        if states is not None:
            with self.heartbeat.lock() as hb:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
208
                beat = {"ident": ident, "tstamp": utils.time_stamp()}
209
                hb[utils.reg_name(objname)] = beat
210
211
        return states

212
213
    def _do_decide_file_sync(self, objname, master, slave, ident,
                             dry_run=False):
214
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
215
216
217
218
219
        logger.info("Deciding object: '%s'" % objname)
        master_state = db.get_state(master, objname)
        slave_state = db.get_state(slave, objname)
        sync_state = db.get_state(self.SYNC, objname)
        decision_state = db.get_state(self.DECISION, objname)
220
221
222
        master_serial = master_state.serial
        slave_serial = slave_state.serial
        sync_serial = sync_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
223
        decision_serial = decision_state.serial
224

225
        with self.heartbeat.lock() as hb:
226
            beat = hb.get(utils.reg_name(objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
227
            logger.debug("object: %s heartbeat: %s" % (objname, beat))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
228
229
            if beat is not None:
                if beat["ident"] == ident:
230
231
232
233
                    if not dry_run:
                        msg = messaging.HeartbeatReplayDecideMessage(
                            objname=objname, heartbeat=beat, logger=logger)
                        self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
234
235
236
                else:
                    if utils.younger_than(
                            beat["tstamp"], self.settings.action_max_wait):
237
238
239
240
                        if not dry_run:
                            msg = messaging.HeartbeatNoDecideMessage(
                                objname=objname, heartbeat=beat, logger=logger)
                            self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
241
242
243
                        return None
                    logger.warning("Ignoring previous run: %s %s" %
                                   (objname, beat))
244

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
245
        if decision_serial != sync_serial:
246
247
            with self.failed_serials.lock() as d:
                failed_sync = d.get((decision_serial, objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
248
249
250
            if failed_sync is None:
                logger.warning(
                    "Already decided: '%s', decision: %s, sync: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
251
                    (objname, decision_serial, sync_serial))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
252
253
254
255
256
257
                if decision_serial == master_serial:
                    return master_state, slave_state, sync_state
                elif decision_serial == slave_serial:
                    return slave_state, master_state, sync_state
                else:
                    raise AssertionError(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
258
                        "Decision serial %s for objname '%s' "
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
259
                        "does not match any archive." %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
260
                        (decision_serial, objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
261
            else:
262
263
264
265
                if not dry_run:
                    msg = messaging.FailedSyncIgnoreDecisionMessage(
                        objname=objname, serial=decision_serial, logger=logger)
                    self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
266
267

        if master_serial > sync_serial:
268
269
            if master_serial == decision_serial:  # this is a failed serial
                return None
270
271
            if not dry_run:
                self._make_decision_state(decision_state, master_state)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
272
            return master_state, slave_state, sync_state
273
274
        elif master_serial == sync_serial:
            if slave_serial > sync_serial:
275
276
                if slave_serial == decision_serial:  # this is a failed serial
                    return None
277
278
                if not dry_run:
                    self._make_decision_state(decision_state, slave_state)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
279
                return slave_state, master_state, sync_state
280
281
282
283
284
285
286
287
288
289
            elif slave_serial == sync_serial:
                return None
            else:
                raise AssertionError("Slave serial %s, sync serial %s"
                                     % (slave_serial, sync_serial))

        else:
            raise AssertionError("Master serial %s, sync serial %s"
                                 % (master_serial, sync_serial))

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
290
    def _make_decision_state(self, decision_state, source_state):
291
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
292
293
294
295
        new_decision_state = decision_state.set(
            serial=source_state.serial, info=source_state.info)
        db.put_state(new_decision_state)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
296
    def sync_file(self, source_state, target_state, sync_state):
297
298
299
300
        msg = messaging.SyncMessage(
            objname=source_state.objname,
            archive=source_state.archive,
            serial=source_state.serial,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
301
            info=source_state.info,
302
303
            logger=logger)
        self.messager.put(msg)
304
        thread = threading.Thread(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
305
            target=self._sync_file,
306
307
            args=(source_state, target_state, sync_state))
        thread.start()
308
        self.sync_threads.append(thread)
309

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
310
    def _sync_file(self, source_state, target_state, sync_state):
311
312
313
314
315
316
317
318
319
320
        clients = self.clients
        source_client = clients[source_state.archive]
        try:
            source_handle = source_client.stage_file(source_state)
        except common.SyncError as e:
            logger.warning(e)
            return
        target_client = clients[target_state.archive]
        target_client.start_pulling_file(
            source_handle, target_state, sync_state,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
321
            callback=self.ack_file_sync,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
322
323
            failure_callback=self.mark_as_failed)

324
    def mark_as_failed(self, state, hard=False):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
325
        serial = state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
326
        objname = state.objname
327
        with self.heartbeat.lock() as hb:
328
            hb.pop(utils.reg_name(objname))
329
330
331
332
333
334
        if hard:
            logger.warning(
                "Marking failed serial %s for archive: %s, object: '%s'" %
                (serial, state.archive, objname))
            with self.failed_serials.lock() as d:
                d[(serial, objname)] = state
335
336
337
338
339
340
341

    def update_state(self, old_state, new_state):
        db = self.get_db()
        db.put_state(new_state)
        # here we could do any checks needed on the old state,
        # perhaps triggering a probe

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
342
    def ack_file_sync(self, synced_source_state, synced_target_state):
343
344
345
346
        try:
            self._ack_file_sync(synced_source_state, synced_target_state)
        except common.DatabaseError:
            return
347
        serial = synced_source_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
348
        objname = synced_source_state.objname
349
        target = synced_target_state.archive
350
        with self.heartbeat.lock() as hb:
351
            hb.pop(utils.reg_name(objname))
352
353
354
355
        msg = messaging.AckSyncMessage(
            archive=target, objname=objname, serial=serial,
            logger=logger)
        self.messager.put(msg)
356
357
358
359
360
361
362
363

    @transaction()
    def _ack_file_sync(self, synced_source_state, synced_target_state):
        db = self.get_db()
        serial = synced_source_state.serial
        objname = synced_source_state.objname
        source = synced_source_state.archive
        target = synced_target_state.archive
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
364
        tinfo = synced_target_state.info
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
365
366
367
        logger.debug("Acking archive: %s, object: '%s', serial: %s "
                     "info: %s" %
                     (target, objname, serial, tinfo))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
368
369
        decision_state = db.get_state(self.DECISION, objname)
        sync_state = db.get_state(self.SYNC, objname)
370
371
372
373
374
375
376
377
378
379

        if serial != decision_state.serial:
            raise AssertionError(
                "Serial mismatch: assumed sync %s, decision %s"
                % (serial, decision_state.serial))
        if serial <= sync_state.serial:
            raise common.SyncError(
                "cannot ack: serial %s < sync serial %s" %
                (serial, sync_state.serial))

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
380
        db_source_state = db.get_state(source, objname)
381
382
383
384
        self.update_state(db_source_state, synced_source_state)

        final_target_state = synced_target_state.set(
            serial=serial)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
385
        db_target_state = db.get_state(target, objname)
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
        self.update_state(db_target_state, final_target_state)

        sync_info = dict(synced_source_state.info)
        sync_info.update(synced_target_state.info)
        # The 'info' namespace is global. Some attributes may be globally
        # recognizable by all clients with the same semantics, such as
        # a content-hash (e.g. SHA256), while other may be specific to
        # each client. Clients are responsible to protect their private
        # attributes creating their own namespace, for example
        # 'localfs_mtime', 'object_store_etag'
        new_sync_state = sync_state.set(serial=serial, info=sync_info)
        db.put_state(new_sync_state)
        new_decision_state = new_sync_state.set(archive=self.DECISION)
        db.put_state(new_decision_state)

    def list_deciding(self, archives=None):
402
403
404
405
406
407
408
        try:
            return self._list_deciding(archives=archives)
        except DatabaseError:
            return self.list_deciding(archives=archives)

    @transaction()
    def _list_deciding(self, archives=None):
409
410
411
        db = self.get_db()
        if archives is None:
            archives = (self.MASTER, self.SLAVE)
412
413
        return set(db.list_deciding(archives=archives,
                                    sync=self.SYNC))
414

415
    def probe_archive(self, archive, forced=False):
416
        ident = utils.time_stamp()
417
        client = self.clients[archive]
418
419
420
421
422
423
        try:
            candidates = client.list_candidate_files(forced=forced)
            self._probe_files(archive, candidates, ident)
            client.remove_candidates(candidates, ident)
        except common.DatabaseError:
            pass
424

425
426
427
428
429
430
431
    def decide_archive(self, archive=None):
        try:
            archives = [archive] if archive is not None else None
            for objname in self.list_deciding(archives):
                self.decide_file_sync(objname)
        except common.DatabaseError:
            pass
432
433

    def decide_all_archives(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
434
        logger.debug("Checking candidates to sync")
435
        self.probe_all()
436
        self.decide_archive()
437

438
439
440
441
    def probe_all(self, forced=False):
        self.probe_archive(self.MASTER, forced=forced)
        self.probe_archive(self.SLAVE, forced=forced)

442
    def _poll_decide(self, interval=3):
443
444
        class DecideThread(utils.StoppableThread):
            def run_body(this):
445
446
                self.decide_all_archives()
                time.sleep(interval)
447
448
        return utils.start_daemon(DecideThread)

449
450
451
452
453
454
455
456
457
458
459
    def check_decisions(self):
        deciding = self.list_deciding()
        decisions = self.dry_run_decisions(deciding)
        by_source = defaultdict(list)
        for decision in decisions:
            source_state = decision[0]
            source = source_state.archive
            objname = source_state.objname
            by_source[source].append(objname)
        return by_source

460
461
462
463
464
465
466
467
    # TODO cleanup db of objects deleted in all clients
    # def cleanup(self):
    #     db = self.get_db()
    #     master_deleted = set(db.list_files_with_info(MASTER, {}))
    #     client_deleted = set(db.list_files_with_info(SLAVE, {}))
    #     deleted = master_deleted.intersection(client_deleted)


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
468
469
def conf(auth_url, auth_token, container, local_root_path, **kwargs):
    settings = SyncerSettings(auth_url=auth_url,
470
471
                              auth_token=auth_token,
                              container=container,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
472
473
                              local_root_path=local_root_path,
                              **kwargs)
474
475
476
    master = PithosFileClient(settings)
    slave = LocalfsFileClient(settings)
    return FileSyncer(settings, master, slave)