syncer.py 16 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16
17
18
19
20
21
22
23
24
import time
import threading
import logging

from agkyra.syncer import common
from agkyra.syncer.setup import SyncerSettings
from agkyra.syncer.database import transaction
from agkyra.syncer.localfs_client import LocalfsFileClient
from agkyra.syncer.pithos_client import PithosFileClient
25
from agkyra.syncer import messaging, utils
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

logger = logging.getLogger(__name__)


class FileSyncer(object):

    dbname = None
    clients = None

    def __init__(self, settings, master, slave):
        self.settings = settings
        self.master = master
        self.slave = slave
        self.DECISION = 'DECISION'
        self.SYNC = 'SYNC'
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
41
42
        self.MASTER = master.SIGNATURE
        self.SLAVE = slave.SIGNATURE
43
44
        self.get_db = settings.get_db
        self.clients = {self.MASTER: master, self.SLAVE: slave}
45
46
47
        self.notifiers = {}
        self.decide_thread = None
        self.sync_threads = []
48
        self.failed_serials = utils.ThreadSafeDict()
49
        self.messager = settings.messager
50
        self.heartbeat = self.settings.heartbeat
51

52
53
54
55
56
57
58
    def thread_is_active(self, t):
        return t and t.is_alive()

    @property
    def decide_active(self):
        return self.thread_is_active(self.decide_thread)

59
60
    @property
    def paused(self):
61
        return not self.decide_active
62

63
    def initiate_probe(self):
64
        self.start_notifiers()
65
        self.probe_all(forced=True)
66
67

    def start_notifiers(self):
68
69
70
        for signature, client in self.clients.iteritems():
            notifier = self.notifiers.get(signature)
            if not self.thread_is_active(notifier):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
71
                self.notifiers[signature] = client.notifier()
72
            else:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
73
                logger.info("Notifier %s already up" % signature)
74
75
76
77

    def stop_notifiers(self):
        for notifier in self.notifiers.values():
            notifier.stop()
78
79

    def start_decide(self):
80
81
82
83
84
85
        if not self.decide_active:
            self.decide_thread = self._poll_decide()

    def stop_decide(self):
        if self.decide_active:
            self.decide_thread.stop()
86

87
88
89
90
91
92
93
    def stop_all_daemons(self):
        self.stop_decide()
        self.stop_notifiers()

    def wait_sync_threads(self):
        for thread in self.sync_threads:
            thread.join()
94

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
95
96
    def get_next_message(self, block=False):
        return self.messager.get(block=block)
97

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
98
    def probe_file(self, archive, objname):
99
        ident = utils.time_stamp()
100
101
102
103
104
105
        try:
            self._probe_files(archive, [objname], ident)
            client = self.clients[archive]
            client.remove_candidates([objname], ident)
        except common.DatabaseError:
            pass
106

107
108
109
110
111
112
    @transaction()
    def _probe_files(self, archive, objnames, ident):
        for objname in objnames:
            self._do_probe_file(archive, objname, ident)

    def _do_probe_file(self, archive, objname, ident):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
113
        logger.info("Probing archive: %s, object: '%s'" % (archive, objname))
114
115
        db = self.get_db()
        client = self.clients[archive]
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
116
117
        db_state = db.get_state(archive, objname)
        ref_state = db.get_state(self.SYNC, objname)
118
119
120
121
        with self.heartbeat.lock() as hb:
            beat = hb.get(objname)
            if beat is not None:
                if utils.younger_than(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
122
                        beat["tstamp"], self.settings.action_max_wait):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
123
                    logger.warning("Object '%s' is being synced; "
124
125
                                   "Probe aborted." % objname)
                    return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
126
        if db_state.serial != ref_state.serial:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
127
128
            logger.warning("Serial mismatch in probing archive: %s, "
                           "object: '%s'" % (archive, objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
129
            return
130
        live_state = client.probe_file(objname, db_state, ref_state, ident)
131
132
        if live_state is not None:
            self.update_file_state(live_state)
133

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
134
    def update_file_state(self, live_state):
135
136
        db = self.get_db()
        archive = live_state.archive
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
137
        objname = live_state.objname
138
        serial = live_state.serial
139
140
141
        msg = messaging.UpdateMessage(
            archive=archive, objname=objname, serial=serial, logger=logger)
        self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
142
        db_state = db.get_state(archive, objname)
143
144
        if db_state and db_state.serial != serial:
            logger.warning(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
145
                "Cannot update archive: %s, object: '%s', "
146
                "serial: %s, db_serial: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
147
                (archive, objname, serial, db_state.serial))
148
149
            return

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
150
        new_serial = db.new_serial(objname)
151
152
153
154
        new_state = live_state.set(serial=new_serial)
        db.put_state(new_state)
        if new_serial == 0:
            sync_state = common.FileState(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
155
                archive=self.SYNC, objname=objname, serial=-1,
156
157
158
                info={})
            db.put_state(sync_state)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
159
    def decide_file_sync(self, objname, master=None, slave=None):
160
161
162
163
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
164
        ident = utils.time_stamp()
165
166
167
168
        try:
            states = self._decide_file_sync(objname, master, slave, ident)
        except common.DatabaseError:
            return
169
170
        if states is None:
            return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
171
        self.sync_file(*states)
172
173

    @transaction()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
174
175
    def _decide_file_sync(self, objname, master, slave, ident):
        states = self._do_decide_file_sync(objname, master, slave, ident)
176
177
        if states is not None:
            with self.heartbeat.lock() as hb:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
178
                beat = {"ident": ident, "tstamp": utils.time_stamp()}
179
                hb[objname] = beat
180
181
        return states

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
182
    def _do_decide_file_sync(self, objname, master, slave, ident):
183
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
184
185
186
187
188
        logger.info("Deciding object: '%s'" % objname)
        master_state = db.get_state(master, objname)
        slave_state = db.get_state(slave, objname)
        sync_state = db.get_state(self.SYNC, objname)
        decision_state = db.get_state(self.DECISION, objname)
189
190
191
        master_serial = master_state.serial
        slave_serial = slave_state.serial
        sync_serial = sync_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
192
        decision_serial = decision_state.serial
193

194
        with self.heartbeat.lock() as hb:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
195
            beat = hb.get(objname)
196
            logger.info("object: %s heartbeat: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
197
198
199
200
201
202
203
204
205
206
207
208
209
                        (objname, beat))
            if beat is not None:
                if beat["ident"] == ident:
                    logger.info("Found heartbeat with current ident %s"
                                % ident)
                else:
                    if utils.younger_than(
                            beat["tstamp"], self.settings.action_max_wait):
                        logger.warning("Object '%s' already handled; aborting."
                                       % objname)
                        return None
                    logger.warning("Ignoring previous run: %s %s" %
                                   (objname, beat))
210

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
211
        if decision_serial != sync_serial:
212
213
            with self.failed_serials.lock() as d:
                failed_sync = d.get((decision_serial, objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
214
215
216
            if failed_sync is None:
                logger.warning(
                    "Already decided: '%s', decision: %s, sync: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
217
                    (objname, decision_serial, sync_serial))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
218
219
220
221
222
223
                if decision_serial == master_serial:
                    return master_state, slave_state, sync_state
                elif decision_serial == slave_serial:
                    return slave_state, master_state, sync_state
                else:
                    raise AssertionError(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
224
                        "Decision serial %s for objname '%s' "
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
225
                        "does not match any archive." %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
226
                        (decision_serial, objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
227
228
229
            else:
                logger.warning(
                    "Ignoring failed decision for: '%s', decision: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
230
                    (objname, decision_serial))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
231
232

        if master_serial > sync_serial:
233
234
            if master_serial == decision_serial:  # this is a failed serial
                return None
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
235
236
            self._make_decision_state(decision_state, master_state)
            return master_state, slave_state, sync_state
237
238
        elif master_serial == sync_serial:
            if slave_serial > sync_serial:
239
240
                if slave_serial == decision_serial:  # this is a failed serial
                    return None
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
241
242
                self._make_decision_state(decision_state, slave_state)
                return slave_state, master_state, sync_state
243
244
245
246
247
248
249
250
251
252
            elif slave_serial == sync_serial:
                return None
            else:
                raise AssertionError("Slave serial %s, sync serial %s"
                                     % (slave_serial, sync_serial))

        else:
            raise AssertionError("Master serial %s, sync serial %s"
                                 % (master_serial, sync_serial))

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
253
    def _make_decision_state(self, decision_state, source_state):
254
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
255
256
257
258
        new_decision_state = decision_state.set(
            serial=source_state.serial, info=source_state.info)
        db.put_state(new_decision_state)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
259
    def sync_file(self, source_state, target_state, sync_state):
260
261
262
263
        msg = messaging.SyncMessage(
            objname=source_state.objname,
            archive=source_state.archive,
            serial=source_state.serial,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
264
            info=source_state.info,
265
266
            logger=logger)
        self.messager.put(msg)
267
        thread = threading.Thread(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
268
            target=self._sync_file,
269
270
            args=(source_state, target_state, sync_state))
        thread.start()
271
        self.sync_threads.append(thread)
272

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
273
    def _sync_file(self, source_state, target_state, sync_state):
274
275
276
277
278
279
280
281
282
283
        clients = self.clients
        source_client = clients[source_state.archive]
        try:
            source_handle = source_client.stage_file(source_state)
        except common.SyncError as e:
            logger.warning(e)
            return
        target_client = clients[target_state.archive]
        target_client.start_pulling_file(
            source_handle, target_state, sync_state,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
284
            callback=self.ack_file_sync,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
285
286
287
288
            failure_callback=self.mark_as_failed)

    def mark_as_failed(self, state):
        serial = state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
289
        objname = state.objname
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
290
        logger.warning(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
291
292
            "Marking failed serial %s for archive: %s, object: '%s'" %
            (serial, state.archive, objname))
293
        with self.heartbeat.lock() as hb:
294
295
296
            hb.pop(objname)
        with self.failed_serials.lock() as d:
            d[(serial, objname)] = state
297
298
299
300
301
302
303

    def update_state(self, old_state, new_state):
        db = self.get_db()
        db.put_state(new_state)
        # here we could do any checks needed on the old state,
        # perhaps triggering a probe

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
304
    def ack_file_sync(self, synced_source_state, synced_target_state):
305
306
307
308
        try:
            self._ack_file_sync(synced_source_state, synced_target_state)
        except common.DatabaseError:
            return
309
        serial = synced_source_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
310
        objname = synced_source_state.objname
311
        target = synced_target_state.archive
312
        with self.heartbeat.lock() as hb:
313
            hb.pop(objname)
314
315
316
317
        msg = messaging.AckSyncMessage(
            archive=target, objname=objname, serial=serial,
            logger=logger)
        self.messager.put(msg)
318
319
320
321
322
323
324
325

    @transaction()
    def _ack_file_sync(self, synced_source_state, synced_target_state):
        db = self.get_db()
        serial = synced_source_state.serial
        objname = synced_source_state.objname
        source = synced_source_state.archive
        target = synced_target_state.archive
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
326
327
328
329
        tinfo = synced_target_state.info
        logger.info("Acking archive: %s, object: '%s', serial: %s "
                    "info: %s" %
                    (target, objname, serial, tinfo))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
330
331
        decision_state = db.get_state(self.DECISION, objname)
        sync_state = db.get_state(self.SYNC, objname)
332
333
334
335
336
337
338
339
340
341

        if serial != decision_state.serial:
            raise AssertionError(
                "Serial mismatch: assumed sync %s, decision %s"
                % (serial, decision_state.serial))
        if serial <= sync_state.serial:
            raise common.SyncError(
                "cannot ack: serial %s < sync serial %s" %
                (serial, sync_state.serial))

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
342
        db_source_state = db.get_state(source, objname)
343
344
345
346
        self.update_state(db_source_state, synced_source_state)

        final_target_state = synced_target_state.set(
            serial=serial)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
347
        db_target_state = db.get_state(target, objname)
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
        self.update_state(db_target_state, final_target_state)

        sync_info = dict(synced_source_state.info)
        sync_info.update(synced_target_state.info)
        # The 'info' namespace is global. Some attributes may be globally
        # recognizable by all clients with the same semantics, such as
        # a content-hash (e.g. SHA256), while other may be specific to
        # each client. Clients are responsible to protect their private
        # attributes creating their own namespace, for example
        # 'localfs_mtime', 'object_store_etag'
        new_sync_state = sync_state.set(serial=serial, info=sync_info)
        db.put_state(new_sync_state)
        new_decision_state = new_sync_state.set(archive=self.DECISION)
        db.put_state(new_decision_state)

    @transaction()
    def list_deciding(self, archives=None):
        db = self.get_db()
        if archives is None:
            archives = (self.MASTER, self.SLAVE)
368
369
        return set(db.list_deciding(archives=archives,
                                    sync=self.SYNC))
370

371
    def probe_archive(self, archive, forced=False):
372
        ident = utils.time_stamp()
373
        client = self.clients[archive]
374
375
376
377
378
379
        try:
            candidates = client.list_candidate_files(forced=forced)
            self._probe_files(archive, candidates, ident)
            client.remove_candidates(candidates, ident)
        except common.DatabaseError:
            pass
380

381
382
383
384
385
386
387
    def decide_archive(self, archive=None):
        try:
            archives = [archive] if archive is not None else None
            for objname in self.list_deciding(archives):
                self.decide_file_sync(objname)
        except common.DatabaseError:
            pass
388
389
390

    def decide_all_archives(self):
        logger.info("Checking candidates to sync")
391
        self.probe_all()
392
        self.decide_archive()
393

394
395
396
397
    def probe_all(self, forced=False):
        self.probe_archive(self.MASTER, forced=forced)
        self.probe_archive(self.SLAVE, forced=forced)

398
    def _poll_decide(self, interval=3):
399
400
        class DecideThread(utils.StoppableThread):
            def run_body(this):
401
402
                self.decide_all_archives()
                time.sleep(interval)
403
404
        return utils.start_daemon(DecideThread)

405
406
407
408
409
410
411
412
    # TODO cleanup db of objects deleted in all clients
    # def cleanup(self):
    #     db = self.get_db()
    #     master_deleted = set(db.list_files_with_info(MASTER, {}))
    #     client_deleted = set(db.list_files_with_info(SLAVE, {}))
    #     deleted = master_deleted.intersection(client_deleted)


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
413
414
def conf(auth_url, auth_token, container, local_root_path, **kwargs):
    settings = SyncerSettings(auth_url=auth_url,
415
416
                              auth_token=auth_token,
                              container=container,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
417
418
                              local_root_path=local_root_path,
                              **kwargs)
419
420
421
    master = PithosFileClient(settings)
    slave = LocalfsFileClient(settings)
    return FileSyncer(settings, master, slave)