syncer.py 16.3 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16
17
18
19
20
21
22
23
24
import time
import threading
import logging

from agkyra.syncer import common
from agkyra.syncer.setup import SyncerSettings
from agkyra.syncer.database import transaction
from agkyra.syncer.localfs_client import LocalfsFileClient
from agkyra.syncer.pithos_client import PithosFileClient
25
from agkyra.syncer import messaging, utils
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

logger = logging.getLogger(__name__)


class FileSyncer(object):

    dbname = None
    clients = None

    def __init__(self, settings, master, slave):
        self.settings = settings
        self.master = master
        self.slave = slave
        self.DECISION = 'DECISION'
        self.SYNC = 'SYNC'
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
41
42
        self.MASTER = master.SIGNATURE
        self.SLAVE = slave.SIGNATURE
43
44
        self.get_db = settings.get_db
        self.clients = {self.MASTER: master, self.SLAVE: slave}
45
46
47
        self.notifiers = {}
        self.decide_thread = None
        self.sync_threads = []
48
        self.failed_serials = utils.ThreadSafeDict()
49
        self.messager = settings.messager
50
        self.heartbeat = self.settings.heartbeat
51

52
53
54
55
56
57
58
    def thread_is_active(self, t):
        return t and t.is_alive()

    @property
    def decide_active(self):
        return self.thread_is_active(self.decide_thread)

59
60
    @property
    def paused(self):
61
        return not self.decide_active
62

63
    def initiate_probe(self):
64
        self.start_notifiers()
65
        self.probe_all(forced=True)
66
67

    def start_notifiers(self):
68
69
70
        for signature, client in self.clients.iteritems():
            notifier = self.notifiers.get(signature)
            if not self.thread_is_active(notifier):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
71
                self.notifiers[signature] = client.notifier()
72
            else:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
73
                logger.info("Notifier %s already up" % signature)
74
75
76
77

    def stop_notifiers(self):
        for notifier in self.notifiers.values():
            notifier.stop()
78
79

    def start_decide(self):
80
81
82
83
84
85
        if not self.decide_active:
            self.decide_thread = self._poll_decide()

    def stop_decide(self):
        if self.decide_active:
            self.decide_thread.stop()
86

87
88
89
90
91
92
93
    def stop_all_daemons(self):
        self.stop_decide()
        self.stop_notifiers()

    def wait_sync_threads(self):
        for thread in self.sync_threads:
            thread.join()
94

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
95
96
    def get_next_message(self, block=False):
        return self.messager.get(block=block)
97

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
98
    def probe_file(self, archive, objname):
99
        ident = utils.time_stamp()
100
101
102
103
104
105
        try:
            self._probe_files(archive, [objname], ident)
            client = self.clients[archive]
            client.remove_candidates([objname], ident)
        except common.DatabaseError:
            pass
106

107
108
109
110
111
112
    @transaction()
    def _probe_files(self, archive, objnames, ident):
        for objname in objnames:
            self._do_probe_file(archive, objname, ident)

    def _do_probe_file(self, archive, objname, ident):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
113
        logger.info("Probing archive: %s, object: '%s'" % (archive, objname))
114
115
        db = self.get_db()
        client = self.clients[archive]
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
116
117
        db_state = db.get_state(archive, objname)
        ref_state = db.get_state(self.SYNC, objname)
118
119
120
121
        with self.heartbeat.lock() as hb:
            beat = hb.get(objname)
            if beat is not None:
                if utils.younger_than(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
122
                        beat["tstamp"], self.settings.action_max_wait):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
123
124
125
126
                    msg = messaging.HeartbeatNoProbeMessage(
                        archive=archive, objname=objname, heartbeat=beat,
                        logger=logger)
                    self.messager.put(msg)
127
                    return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
128
        if db_state.serial != ref_state.serial:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
129
            msg = messaging.AlreadyProbedMessage(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
130
131
                archive=archive, objname=objname, serial=db_state.serial,
                logger=logger)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
132
            self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
133
            return
134
        live_state = client.probe_file(objname, db_state, ref_state, ident)
135
136
        if live_state is not None:
            self.update_file_state(live_state)
137

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
138
    def update_file_state(self, live_state):
139
140
        db = self.get_db()
        archive = live_state.archive
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
141
        objname = live_state.objname
142
        serial = live_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
143
        db_state = db.get_state(archive, objname)
144
145
        if db_state and db_state.serial != serial:
            logger.warning(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
146
                "Cannot update archive: %s, object: '%s', "
147
                "serial: %s, db_serial: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
148
                (archive, objname, serial, db_state.serial))
149
150
            return

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
151
        new_serial = db.new_serial(objname)
152
153
        new_state = live_state.set(serial=new_serial)
        db.put_state(new_state)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
154
155
156
157
        msg = messaging.UpdateMessage(
            archive=archive, objname=objname,
            serial=new_serial, old_serial=serial, logger=logger)
        self.messager.put(msg)
158
159
        if new_serial == 0:
            sync_state = common.FileState(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
160
                archive=self.SYNC, objname=objname, serial=-1,
161
162
163
                info={})
            db.put_state(sync_state)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
164
    def decide_file_sync(self, objname, master=None, slave=None):
165
166
167
168
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
169
        ident = utils.time_stamp()
170
171
172
173
        try:
            states = self._decide_file_sync(objname, master, slave, ident)
        except common.DatabaseError:
            return
174
175
        if states is None:
            return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
176
        self.sync_file(*states)
177
178

    @transaction()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
179
180
    def _decide_file_sync(self, objname, master, slave, ident):
        states = self._do_decide_file_sync(objname, master, slave, ident)
181
182
        if states is not None:
            with self.heartbeat.lock() as hb:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
183
                beat = {"ident": ident, "tstamp": utils.time_stamp()}
184
                hb[objname] = beat
185
186
        return states

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
187
    def _do_decide_file_sync(self, objname, master, slave, ident):
188
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
189
190
191
192
193
        logger.info("Deciding object: '%s'" % objname)
        master_state = db.get_state(master, objname)
        slave_state = db.get_state(slave, objname)
        sync_state = db.get_state(self.SYNC, objname)
        decision_state = db.get_state(self.DECISION, objname)
194
195
196
        master_serial = master_state.serial
        slave_serial = slave_state.serial
        sync_serial = sync_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
197
        decision_serial = decision_state.serial
198

199
        with self.heartbeat.lock() as hb:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
200
            beat = hb.get(objname)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
201
            logger.debug("object: %s heartbeat: %s" % (objname, beat))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
202
203
            if beat is not None:
                if beat["ident"] == ident:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
204
205
206
                    msg = messaging.HeartbeatReplayDecideMessage(
                        objname=objname, heartbeat=beat, logger=logger)
                    self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
207
208
209
                else:
                    if utils.younger_than(
                            beat["tstamp"], self.settings.action_max_wait):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
210
211
212
                        msg = messaging.HeartbeatNoDecideMessage(
                            objname=objname, heartbeat=beat, logger=logger)
                        self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
213
214
215
                        return None
                    logger.warning("Ignoring previous run: %s %s" %
                                   (objname, beat))
216

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
217
        if decision_serial != sync_serial:
218
219
            with self.failed_serials.lock() as d:
                failed_sync = d.get((decision_serial, objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
220
221
222
            if failed_sync is None:
                logger.warning(
                    "Already decided: '%s', decision: %s, sync: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
223
                    (objname, decision_serial, sync_serial))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
224
225
226
227
228
229
                if decision_serial == master_serial:
                    return master_state, slave_state, sync_state
                elif decision_serial == slave_serial:
                    return slave_state, master_state, sync_state
                else:
                    raise AssertionError(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
230
                        "Decision serial %s for objname '%s' "
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
231
                        "does not match any archive." %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
232
                        (decision_serial, objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
233
            else:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
234
235
236
                msg = messaging.FailedSyncIgnoreDecisionMessage(
                    objname=objname, serial=decision_serial, logger=logger)
                self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
237
238

        if master_serial > sync_serial:
239
240
            if master_serial == decision_serial:  # this is a failed serial
                return None
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
241
242
            self._make_decision_state(decision_state, master_state)
            return master_state, slave_state, sync_state
243
244
        elif master_serial == sync_serial:
            if slave_serial > sync_serial:
245
246
                if slave_serial == decision_serial:  # this is a failed serial
                    return None
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
247
248
                self._make_decision_state(decision_state, slave_state)
                return slave_state, master_state, sync_state
249
250
251
252
253
254
255
256
257
258
            elif slave_serial == sync_serial:
                return None
            else:
                raise AssertionError("Slave serial %s, sync serial %s"
                                     % (slave_serial, sync_serial))

        else:
            raise AssertionError("Master serial %s, sync serial %s"
                                 % (master_serial, sync_serial))

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
259
    def _make_decision_state(self, decision_state, source_state):
260
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
261
262
263
264
        new_decision_state = decision_state.set(
            serial=source_state.serial, info=source_state.info)
        db.put_state(new_decision_state)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
265
    def sync_file(self, source_state, target_state, sync_state):
266
267
268
269
        msg = messaging.SyncMessage(
            objname=source_state.objname,
            archive=source_state.archive,
            serial=source_state.serial,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
270
            info=source_state.info,
271
272
            logger=logger)
        self.messager.put(msg)
273
        thread = threading.Thread(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
274
            target=self._sync_file,
275
276
            args=(source_state, target_state, sync_state))
        thread.start()
277
        self.sync_threads.append(thread)
278

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
279
    def _sync_file(self, source_state, target_state, sync_state):
280
281
282
283
284
285
286
287
288
289
        clients = self.clients
        source_client = clients[source_state.archive]
        try:
            source_handle = source_client.stage_file(source_state)
        except common.SyncError as e:
            logger.warning(e)
            return
        target_client = clients[target_state.archive]
        target_client.start_pulling_file(
            source_handle, target_state, sync_state,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
290
            callback=self.ack_file_sync,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
291
292
            failure_callback=self.mark_as_failed)

293
    def mark_as_failed(self, state, hard=False):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
294
        serial = state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
295
        objname = state.objname
296
        with self.heartbeat.lock() as hb:
297
            hb.pop(objname)
298
299
300
301
302
303
        if hard:
            logger.warning(
                "Marking failed serial %s for archive: %s, object: '%s'" %
                (serial, state.archive, objname))
            with self.failed_serials.lock() as d:
                d[(serial, objname)] = state
304
305
306
307
308
309
310

    def update_state(self, old_state, new_state):
        db = self.get_db()
        db.put_state(new_state)
        # here we could do any checks needed on the old state,
        # perhaps triggering a probe

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
311
    def ack_file_sync(self, synced_source_state, synced_target_state):
312
313
314
315
        try:
            self._ack_file_sync(synced_source_state, synced_target_state)
        except common.DatabaseError:
            return
316
        serial = synced_source_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
317
        objname = synced_source_state.objname
318
        target = synced_target_state.archive
319
        with self.heartbeat.lock() as hb:
320
            hb.pop(objname)
321
322
323
324
        msg = messaging.AckSyncMessage(
            archive=target, objname=objname, serial=serial,
            logger=logger)
        self.messager.put(msg)
325
326
327
328
329
330
331
332

    @transaction()
    def _ack_file_sync(self, synced_source_state, synced_target_state):
        db = self.get_db()
        serial = synced_source_state.serial
        objname = synced_source_state.objname
        source = synced_source_state.archive
        target = synced_target_state.archive
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
333
        tinfo = synced_target_state.info
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
334
335
336
        logger.debug("Acking archive: %s, object: '%s', serial: %s "
                     "info: %s" %
                     (target, objname, serial, tinfo))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
337
338
        decision_state = db.get_state(self.DECISION, objname)
        sync_state = db.get_state(self.SYNC, objname)
339
340
341
342
343
344
345
346
347
348

        if serial != decision_state.serial:
            raise AssertionError(
                "Serial mismatch: assumed sync %s, decision %s"
                % (serial, decision_state.serial))
        if serial <= sync_state.serial:
            raise common.SyncError(
                "cannot ack: serial %s < sync serial %s" %
                (serial, sync_state.serial))

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
349
        db_source_state = db.get_state(source, objname)
350
351
352
353
        self.update_state(db_source_state, synced_source_state)

        final_target_state = synced_target_state.set(
            serial=serial)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
354
        db_target_state = db.get_state(target, objname)
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
        self.update_state(db_target_state, final_target_state)

        sync_info = dict(synced_source_state.info)
        sync_info.update(synced_target_state.info)
        # The 'info' namespace is global. Some attributes may be globally
        # recognizable by all clients with the same semantics, such as
        # a content-hash (e.g. SHA256), while other may be specific to
        # each client. Clients are responsible to protect their private
        # attributes creating their own namespace, for example
        # 'localfs_mtime', 'object_store_etag'
        new_sync_state = sync_state.set(serial=serial, info=sync_info)
        db.put_state(new_sync_state)
        new_decision_state = new_sync_state.set(archive=self.DECISION)
        db.put_state(new_decision_state)

    @transaction()
    def list_deciding(self, archives=None):
        db = self.get_db()
        if archives is None:
            archives = (self.MASTER, self.SLAVE)
375
376
        return set(db.list_deciding(archives=archives,
                                    sync=self.SYNC))
377

378
    def probe_archive(self, archive, forced=False):
379
        ident = utils.time_stamp()
380
        client = self.clients[archive]
381
382
383
384
385
386
        try:
            candidates = client.list_candidate_files(forced=forced)
            self._probe_files(archive, candidates, ident)
            client.remove_candidates(candidates, ident)
        except common.DatabaseError:
            pass
387

388
389
390
391
392
393
394
    def decide_archive(self, archive=None):
        try:
            archives = [archive] if archive is not None else None
            for objname in self.list_deciding(archives):
                self.decide_file_sync(objname)
        except common.DatabaseError:
            pass
395
396

    def decide_all_archives(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
397
        logger.debug("Checking candidates to sync")
398
        self.probe_all()
399
        self.decide_archive()
400

401
402
403
404
    def probe_all(self, forced=False):
        self.probe_archive(self.MASTER, forced=forced)
        self.probe_archive(self.SLAVE, forced=forced)

405
    def _poll_decide(self, interval=3):
406
407
        class DecideThread(utils.StoppableThread):
            def run_body(this):
408
409
                self.decide_all_archives()
                time.sleep(interval)
410
411
        return utils.start_daemon(DecideThread)

412
413
414
415
416
417
418
419
    # TODO cleanup db of objects deleted in all clients
    # def cleanup(self):
    #     db = self.get_db()
    #     master_deleted = set(db.list_files_with_info(MASTER, {}))
    #     client_deleted = set(db.list_files_with_info(SLAVE, {}))
    #     deleted = master_deleted.intersection(client_deleted)


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
420
421
def conf(auth_url, auth_token, container, local_root_path, **kwargs):
    settings = SyncerSettings(auth_url=auth_url,
422
423
                              auth_token=auth_token,
                              container=container,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
424
425
                              local_root_path=local_root_path,
                              **kwargs)
426
427
428
    master = PithosFileClient(settings)
    slave = LocalfsFileClient(settings)
    return FileSyncer(settings, master, slave)