syncer.py 16.2 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16
17
18
19
20
21
22
23
24
import time
import threading
import logging

from agkyra.syncer import common
from agkyra.syncer.setup import SyncerSettings
from agkyra.syncer.database import transaction
from agkyra.syncer.localfs_client import LocalfsFileClient
from agkyra.syncer.pithos_client import PithosFileClient
25
from agkyra.syncer import messaging, utils
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

logger = logging.getLogger(__name__)


class FileSyncer(object):

    dbname = None
    clients = None

    def __init__(self, settings, master, slave):
        self.settings = settings
        self.master = master
        self.slave = slave
        self.DECISION = 'DECISION'
        self.SYNC = 'SYNC'
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
41
42
        self.MASTER = master.SIGNATURE
        self.SLAVE = slave.SIGNATURE
43
44
        self.get_db = settings.get_db
        self.clients = {self.MASTER: master, self.SLAVE: slave}
45
46
47
        self.notifiers = {}
        self.decide_thread = None
        self.sync_threads = []
48
        self.failed_serials = utils.ThreadSafeDict()
49
        self.messager = settings.messager
50
        self.heartbeat = self.settings.heartbeat
51

52
53
54
55
56
57
58
    def thread_is_active(self, t):
        return t and t.is_alive()

    @property
    def decide_active(self):
        return self.thread_is_active(self.decide_thread)

59
60
    @property
    def paused(self):
61
        return not self.decide_active
62

63
    def initiate_probe(self):
64
        self.start_notifiers()
65
        self.probe_all(forced=True)
66
67

    def start_notifiers(self):
68
69
70
        for signature, client in self.clients.iteritems():
            notifier = self.notifiers.get(signature)
            if not self.thread_is_active(notifier):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
71
                self.notifiers[signature] = client.notifier()
72
            else:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
73
                logger.info("Notifier %s already up" % signature)
74
75
76
77

    def stop_notifiers(self):
        for notifier in self.notifiers.values():
            notifier.stop()
78
79

    def start_decide(self):
80
81
82
83
84
85
        if not self.decide_active:
            self.decide_thread = self._poll_decide()

    def stop_decide(self):
        if self.decide_active:
            self.decide_thread.stop()
86

87
88
89
90
91
92
93
    def stop_all_daemons(self):
        self.stop_decide()
        self.stop_notifiers()

    def wait_sync_threads(self):
        for thread in self.sync_threads:
            thread.join()
94

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
95
96
    def get_next_message(self, block=False):
        return self.messager.get(block=block)
97

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
98
    def probe_file(self, archive, objname):
99
        ident = utils.time_stamp()
100
101
102
103
104
105
        try:
            self._probe_files(archive, [objname], ident)
            client = self.clients[archive]
            client.remove_candidates([objname], ident)
        except common.DatabaseError:
            pass
106

107
108
109
110
111
112
    @transaction()
    def _probe_files(self, archive, objnames, ident):
        for objname in objnames:
            self._do_probe_file(archive, objname, ident)

    def _do_probe_file(self, archive, objname, ident):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
113
        logger.info("Probing archive: %s, object: '%s'" % (archive, objname))
114
115
        db = self.get_db()
        client = self.clients[archive]
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
116
117
        db_state = db.get_state(archive, objname)
        ref_state = db.get_state(self.SYNC, objname)
118
119
120
121
        with self.heartbeat.lock() as hb:
            beat = hb.get(objname)
            if beat is not None:
                if utils.younger_than(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
122
                        beat["tstamp"], self.settings.action_max_wait):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
123
124
125
126
                    msg = messaging.HeartbeatNoProbeMessage(
                        archive=archive, objname=objname, heartbeat=beat,
                        logger=logger)
                    self.messager.put(msg)
127
                    return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
128
        if db_state.serial != ref_state.serial:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
129
130
131
            msg = messaging.AlreadyProbedMessage(
                archive=archive, objname=objname, serial=serial, logger=logger)
            self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
132
            return
133
        live_state = client.probe_file(objname, db_state, ref_state, ident)
134
135
        if live_state is not None:
            self.update_file_state(live_state)
136

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
137
    def update_file_state(self, live_state):
138
139
        db = self.get_db()
        archive = live_state.archive
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
140
        objname = live_state.objname
141
        serial = live_state.serial
142
143
144
        msg = messaging.UpdateMessage(
            archive=archive, objname=objname, serial=serial, logger=logger)
        self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
145
        db_state = db.get_state(archive, objname)
146
147
        if db_state and db_state.serial != serial:
            logger.warning(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
148
                "Cannot update archive: %s, object: '%s', "
149
                "serial: %s, db_serial: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
150
                (archive, objname, serial, db_state.serial))
151
152
            return

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
153
        new_serial = db.new_serial(objname)
154
155
156
157
        new_state = live_state.set(serial=new_serial)
        db.put_state(new_state)
        if new_serial == 0:
            sync_state = common.FileState(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
158
                archive=self.SYNC, objname=objname, serial=-1,
159
160
161
                info={})
            db.put_state(sync_state)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
162
    def decide_file_sync(self, objname, master=None, slave=None):
163
164
165
166
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
167
        ident = utils.time_stamp()
168
169
170
171
        try:
            states = self._decide_file_sync(objname, master, slave, ident)
        except common.DatabaseError:
            return
172
173
        if states is None:
            return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
174
        self.sync_file(*states)
175
176

    @transaction()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
177
178
    def _decide_file_sync(self, objname, master, slave, ident):
        states = self._do_decide_file_sync(objname, master, slave, ident)
179
180
        if states is not None:
            with self.heartbeat.lock() as hb:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
181
                beat = {"ident": ident, "tstamp": utils.time_stamp()}
182
                hb[objname] = beat
183
184
        return states

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
185
    def _do_decide_file_sync(self, objname, master, slave, ident):
186
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
187
188
189
190
191
        logger.info("Deciding object: '%s'" % objname)
        master_state = db.get_state(master, objname)
        slave_state = db.get_state(slave, objname)
        sync_state = db.get_state(self.SYNC, objname)
        decision_state = db.get_state(self.DECISION, objname)
192
193
194
        master_serial = master_state.serial
        slave_serial = slave_state.serial
        sync_serial = sync_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
195
        decision_serial = decision_state.serial
196

197
        with self.heartbeat.lock() as hb:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
198
            beat = hb.get(objname)
199
            logger.info("object: %s heartbeat: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
200
201
202
203
204
205
206
207
                        (objname, beat))
            if beat is not None:
                if beat["ident"] == ident:
                    logger.info("Found heartbeat with current ident %s"
                                % ident)
                else:
                    if utils.younger_than(
                            beat["tstamp"], self.settings.action_max_wait):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
208
209
210
                        msg = messaging.HeartbeatNoDecideMessage(
                            objname=objname, heartbeat=beat, logger=logger)
                        self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
211
212
213
                        return None
                    logger.warning("Ignoring previous run: %s %s" %
                                   (objname, beat))
214

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
215
        if decision_serial != sync_serial:
216
217
            with self.failed_serials.lock() as d:
                failed_sync = d.get((decision_serial, objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
218
219
220
            if failed_sync is None:
                logger.warning(
                    "Already decided: '%s', decision: %s, sync: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
221
                    (objname, decision_serial, sync_serial))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
222
223
224
225
226
227
                if decision_serial == master_serial:
                    return master_state, slave_state, sync_state
                elif decision_serial == slave_serial:
                    return slave_state, master_state, sync_state
                else:
                    raise AssertionError(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
228
                        "Decision serial %s for objname '%s' "
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
229
                        "does not match any archive." %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
230
                        (decision_serial, objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
231
232
233
            else:
                logger.warning(
                    "Ignoring failed decision for: '%s', decision: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
234
                    (objname, decision_serial))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
235
236

        if master_serial > sync_serial:
237
238
            if master_serial == decision_serial:  # this is a failed serial
                return None
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
239
240
            self._make_decision_state(decision_state, master_state)
            return master_state, slave_state, sync_state
241
242
        elif master_serial == sync_serial:
            if slave_serial > sync_serial:
243
244
                if slave_serial == decision_serial:  # this is a failed serial
                    return None
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
245
246
                self._make_decision_state(decision_state, slave_state)
                return slave_state, master_state, sync_state
247
248
249
250
251
252
253
254
255
256
            elif slave_serial == sync_serial:
                return None
            else:
                raise AssertionError("Slave serial %s, sync serial %s"
                                     % (slave_serial, sync_serial))

        else:
            raise AssertionError("Master serial %s, sync serial %s"
                                 % (master_serial, sync_serial))

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
257
    def _make_decision_state(self, decision_state, source_state):
258
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
259
260
261
262
        new_decision_state = decision_state.set(
            serial=source_state.serial, info=source_state.info)
        db.put_state(new_decision_state)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
263
    def sync_file(self, source_state, target_state, sync_state):
264
265
266
267
        msg = messaging.SyncMessage(
            objname=source_state.objname,
            archive=source_state.archive,
            serial=source_state.serial,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
268
            info=source_state.info,
269
270
            logger=logger)
        self.messager.put(msg)
271
        thread = threading.Thread(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
272
            target=self._sync_file,
273
274
            args=(source_state, target_state, sync_state))
        thread.start()
275
        self.sync_threads.append(thread)
276

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
277
    def _sync_file(self, source_state, target_state, sync_state):
278
279
280
281
282
283
284
285
286
287
        clients = self.clients
        source_client = clients[source_state.archive]
        try:
            source_handle = source_client.stage_file(source_state)
        except common.SyncError as e:
            logger.warning(e)
            return
        target_client = clients[target_state.archive]
        target_client.start_pulling_file(
            source_handle, target_state, sync_state,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
288
            callback=self.ack_file_sync,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
289
290
            failure_callback=self.mark_as_failed)

291
    def mark_as_failed(self, state, hard=False):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
292
        serial = state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
293
        objname = state.objname
294
        with self.heartbeat.lock() as hb:
295
            hb.pop(objname)
296
297
298
299
300
301
        if hard:
            logger.warning(
                "Marking failed serial %s for archive: %s, object: '%s'" %
                (serial, state.archive, objname))
            with self.failed_serials.lock() as d:
                d[(serial, objname)] = state
302
303
304
305
306
307
308

    def update_state(self, old_state, new_state):
        db = self.get_db()
        db.put_state(new_state)
        # here we could do any checks needed on the old state,
        # perhaps triggering a probe

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
309
    def ack_file_sync(self, synced_source_state, synced_target_state):
310
311
312
313
        try:
            self._ack_file_sync(synced_source_state, synced_target_state)
        except common.DatabaseError:
            return
314
        serial = synced_source_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
315
        objname = synced_source_state.objname
316
        target = synced_target_state.archive
317
        with self.heartbeat.lock() as hb:
318
            hb.pop(objname)
319
320
321
322
        msg = messaging.AckSyncMessage(
            archive=target, objname=objname, serial=serial,
            logger=logger)
        self.messager.put(msg)
323
324
325
326
327
328
329
330

    @transaction()
    def _ack_file_sync(self, synced_source_state, synced_target_state):
        db = self.get_db()
        serial = synced_source_state.serial
        objname = synced_source_state.objname
        source = synced_source_state.archive
        target = synced_target_state.archive
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
331
332
333
334
        tinfo = synced_target_state.info
        logger.info("Acking archive: %s, object: '%s', serial: %s "
                    "info: %s" %
                    (target, objname, serial, tinfo))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
335
336
        decision_state = db.get_state(self.DECISION, objname)
        sync_state = db.get_state(self.SYNC, objname)
337
338
339
340
341
342
343
344
345
346

        if serial != decision_state.serial:
            raise AssertionError(
                "Serial mismatch: assumed sync %s, decision %s"
                % (serial, decision_state.serial))
        if serial <= sync_state.serial:
            raise common.SyncError(
                "cannot ack: serial %s < sync serial %s" %
                (serial, sync_state.serial))

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
347
        db_source_state = db.get_state(source, objname)
348
349
350
351
        self.update_state(db_source_state, synced_source_state)

        final_target_state = synced_target_state.set(
            serial=serial)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
352
        db_target_state = db.get_state(target, objname)
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
        self.update_state(db_target_state, final_target_state)

        sync_info = dict(synced_source_state.info)
        sync_info.update(synced_target_state.info)
        # The 'info' namespace is global. Some attributes may be globally
        # recognizable by all clients with the same semantics, such as
        # a content-hash (e.g. SHA256), while other may be specific to
        # each client. Clients are responsible to protect their private
        # attributes creating their own namespace, for example
        # 'localfs_mtime', 'object_store_etag'
        new_sync_state = sync_state.set(serial=serial, info=sync_info)
        db.put_state(new_sync_state)
        new_decision_state = new_sync_state.set(archive=self.DECISION)
        db.put_state(new_decision_state)

    @transaction()
    def list_deciding(self, archives=None):
        db = self.get_db()
        if archives is None:
            archives = (self.MASTER, self.SLAVE)
373
374
        return set(db.list_deciding(archives=archives,
                                    sync=self.SYNC))
375

376
    def probe_archive(self, archive, forced=False):
377
        ident = utils.time_stamp()
378
        client = self.clients[archive]
379
380
381
382
383
384
        try:
            candidates = client.list_candidate_files(forced=forced)
            self._probe_files(archive, candidates, ident)
            client.remove_candidates(candidates, ident)
        except common.DatabaseError:
            pass
385

386
387
388
389
390
391
392
    def decide_archive(self, archive=None):
        try:
            archives = [archive] if archive is not None else None
            for objname in self.list_deciding(archives):
                self.decide_file_sync(objname)
        except common.DatabaseError:
            pass
393
394
395

    def decide_all_archives(self):
        logger.info("Checking candidates to sync")
396
        self.probe_all()
397
        self.decide_archive()
398

399
400
401
402
    def probe_all(self, forced=False):
        self.probe_archive(self.MASTER, forced=forced)
        self.probe_archive(self.SLAVE, forced=forced)

403
    def _poll_decide(self, interval=3):
404
405
        class DecideThread(utils.StoppableThread):
            def run_body(this):
406
407
                self.decide_all_archives()
                time.sleep(interval)
408
409
        return utils.start_daemon(DecideThread)

410
411
412
413
414
415
416
417
    # TODO cleanup db of objects deleted in all clients
    # def cleanup(self):
    #     db = self.get_db()
    #     master_deleted = set(db.list_files_with_info(MASTER, {}))
    #     client_deleted = set(db.list_files_with_info(SLAVE, {}))
    #     deleted = master_deleted.intersection(client_deleted)


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
418
419
def conf(auth_url, auth_token, container, local_root_path, **kwargs):
    settings = SyncerSettings(auth_url=auth_url,
420
421
                              auth_token=auth_token,
                              container=container,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
422
423
                              local_root_path=local_root_path,
                              **kwargs)
424
425
426
    master = PithosFileClient(settings)
    slave = LocalfsFileClient(settings)
    return FileSyncer(settings, master, slave)