syncer.py 15.1 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16
17
18
19
20
21
22
23
24
import time
import threading
import logging

from agkyra.syncer import common
from agkyra.syncer.setup import SyncerSettings
from agkyra.syncer.database import transaction
from agkyra.syncer.localfs_client import LocalfsFileClient
from agkyra.syncer.pithos_client import PithosFileClient
25
from agkyra.syncer import messaging, utils
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

logger = logging.getLogger(__name__)


class FileSyncer(object):

    dbname = None
    clients = None

    def __init__(self, settings, master, slave):
        self.settings = settings
        self.master = master
        self.slave = slave
        self.DECISION = 'DECISION'
        self.SYNC = 'SYNC'
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
41
42
        self.MASTER = master.SIGNATURE
        self.SLAVE = slave.SIGNATURE
43
44
        self.get_db = settings.get_db
        self.clients = {self.MASTER: master, self.SLAVE: slave}
45
46
47
        self.notifiers = {}
        self.decide_thread = None
        self.sync_threads = []
48
        self.failed_serials = utils.ThreadSafeDict()
49
        self.messager = settings.messager
50
        self.heartbeat = self.settings.heartbeat
51

52
53
54
55
56
57
58
    def thread_is_active(self, t):
        return t and t.is_alive()

    @property
    def decide_active(self):
        return self.thread_is_active(self.decide_thread)

59
60
    @property
    def paused(self):
61
        return not self.decide_active
62

63
    def initiate_probe(self):
64
        self.start_notifiers()
65
        self.probe_all(forced=True)
66
67

    def start_notifiers(self):
68
69
70
        for signature, client in self.clients.iteritems():
            notifier = self.notifiers.get(signature)
            if not self.thread_is_active(notifier):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
71
                self.notifiers[signature] = client.notifier()
72
            else:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
73
                logger.info("Notifier %s already up" % signature)
74
75
76
77

    def stop_notifiers(self):
        for notifier in self.notifiers.values():
            notifier.stop()
78
79

    def start_decide(self):
80
81
82
83
84
85
        if not self.decide_active:
            self.decide_thread = self._poll_decide()

    def stop_decide(self):
        if self.decide_active:
            self.decide_thread.stop()
86

87
88
89
90
91
92
93
    def stop_all_daemons(self):
        self.stop_decide()
        self.stop_notifiers()

    def wait_sync_threads(self):
        for thread in self.sync_threads:
            thread.join()
94

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
95
96
    def get_next_message(self, block=False):
        return self.messager.get(block=block)
97

98
    @transaction()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
99
    def probe_file(self, archive, objname):
100
101
102
        return self._probe_file(archive, objname)

    def _probe_file(self, archive, objname):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
103
        logger.info("Probing archive: %s, object: '%s'" % (archive, objname))
104
105
        db = self.get_db()
        client = self.clients[archive]
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
106
107
        db_state = db.get_state(archive, objname)
        ref_state = db.get_state(self.SYNC, objname)
108
109
110
111
        with self.heartbeat.lock() as hb:
            beat = hb.get(objname)
            if beat is not None:
                if utils.younger_than(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
112
                        beat["tstamp"], self.settings.action_max_wait):
113
114
115
                    logger.warning("Object '%s' already handled; "
                                   "Probe aborted." % objname)
                    return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
116
        if db_state.serial != ref_state.serial:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
117
118
            logger.warning("Serial mismatch in probing archive: %s, "
                           "object: '%s'" % (archive, objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
119
            return
120
121
122
        live_state = client.probe_file(objname, db_state, ref_state)
        if live_state is not None:
            self.update_file_state(live_state)
123

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
124
    def update_file_state(self, live_state):
125
126
        db = self.get_db()
        archive = live_state.archive
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
127
        objname = live_state.objname
128
        serial = live_state.serial
129
130
131
        msg = messaging.UpdateMessage(
            archive=archive, objname=objname, serial=serial, logger=logger)
        self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
132
        db_state = db.get_state(archive, objname)
133
134
        if db_state and db_state.serial != serial:
            logger.warning(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
135
                "Cannot update archive: %s, object: '%s', "
136
                "serial: %s, db_serial: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
137
                (archive, objname, serial, db_state.serial))
138
139
            return

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
140
        new_serial = db.new_serial(objname)
141
142
143
144
        new_state = live_state.set(serial=new_serial)
        db.put_state(new_state)
        if new_serial == 0:
            sync_state = common.FileState(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
145
                archive=self.SYNC, objname=objname, serial=-1,
146
147
148
                info={})
            db.put_state(sync_state)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
149
    def decide_file_sync(self, objname, master=None, slave=None):
150
151
152
153
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
154
155
        ident = utils.time_stamp()
        states = self._decide_file_sync(objname, master, slave, ident)
156
157
        if states is None:
            return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
158
        self.sync_file(*states)
159
160

    @transaction()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
161
162
    def _decide_file_sync(self, objname, master, slave, ident):
        states = self._do_decide_file_sync(objname, master, slave, ident)
163
164
        if states is not None:
            with self.heartbeat.lock() as hb:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
165
                beat = {"ident": ident, "tstamp": utils.time_stamp()}
166
                hb[objname] = beat
167
168
        return states

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
169
    def _do_decide_file_sync(self, objname, master, slave, ident):
170
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
171
172
173
174
175
        logger.info("Deciding object: '%s'" % objname)
        master_state = db.get_state(master, objname)
        slave_state = db.get_state(slave, objname)
        sync_state = db.get_state(self.SYNC, objname)
        decision_state = db.get_state(self.DECISION, objname)
176
177
178
        master_serial = master_state.serial
        slave_serial = slave_state.serial
        sync_serial = sync_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
179
        decision_serial = decision_state.serial
180

181
        with self.heartbeat.lock() as hb:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
182
            beat = hb.get(objname)
183
            logger.info("object: %s heartbeat: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
184
185
186
187
188
189
190
191
192
193
194
195
196
                        (objname, beat))
            if beat is not None:
                if beat["ident"] == ident:
                    logger.info("Found heartbeat with current ident %s"
                                % ident)
                else:
                    if utils.younger_than(
                            beat["tstamp"], self.settings.action_max_wait):
                        logger.warning("Object '%s' already handled; aborting."
                                       % objname)
                        return None
                    logger.warning("Ignoring previous run: %s %s" %
                                   (objname, beat))
197

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
198
        if decision_serial != sync_serial:
199
200
            with self.failed_serials.lock() as d:
                failed_sync = d.get((decision_serial, objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
201
202
203
            if failed_sync is None:
                logger.warning(
                    "Already decided: '%s', decision: %s, sync: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
204
                    (objname, decision_serial, sync_serial))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
205
206
207
208
209
210
                if decision_serial == master_serial:
                    return master_state, slave_state, sync_state
                elif decision_serial == slave_serial:
                    return slave_state, master_state, sync_state
                else:
                    raise AssertionError(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
211
                        "Decision serial %s for objname '%s' "
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
212
                        "does not match any archive." %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
213
                        (decision_serial, objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
214
215
216
            else:
                logger.warning(
                    "Ignoring failed decision for: '%s', decision: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
217
                    (objname, decision_serial))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
218
219
220
221

        if master_serial > sync_serial:
            self._make_decision_state(decision_state, master_state)
            return master_state, slave_state, sync_state
222
223
        elif master_serial == sync_serial:
            if slave_serial > sync_serial:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
224
225
                self._make_decision_state(decision_state, slave_state)
                return slave_state, master_state, sync_state
226
227
228
229
230
231
232
233
234
235
            elif slave_serial == sync_serial:
                return None
            else:
                raise AssertionError("Slave serial %s, sync serial %s"
                                     % (slave_serial, sync_serial))

        else:
            raise AssertionError("Master serial %s, sync serial %s"
                                 % (master_serial, sync_serial))

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
236
    def _make_decision_state(self, decision_state, source_state):
237
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
238
239
240
241
        new_decision_state = decision_state.set(
            serial=source_state.serial, info=source_state.info)
        db.put_state(new_decision_state)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
242
    def sync_file(self, source_state, target_state, sync_state):
243
244
245
246
        msg = messaging.SyncMessage(
            objname=source_state.objname,
            archive=source_state.archive,
            serial=source_state.serial,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
247
            info=source_state.info,
248
249
            logger=logger)
        self.messager.put(msg)
250
        thread = threading.Thread(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
251
            target=self._sync_file,
252
253
            args=(source_state, target_state, sync_state))
        thread.start()
254
        self.sync_threads.append(thread)
255

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
256
    def _sync_file(self, source_state, target_state, sync_state):
257
258
259
260
261
262
263
264
265
266
        clients = self.clients
        source_client = clients[source_state.archive]
        try:
            source_handle = source_client.stage_file(source_state)
        except common.SyncError as e:
            logger.warning(e)
            return
        target_client = clients[target_state.archive]
        target_client.start_pulling_file(
            source_handle, target_state, sync_state,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
267
            callback=self.ack_file_sync,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
268
269
270
271
            failure_callback=self.mark_as_failed)

    def mark_as_failed(self, state):
        serial = state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
272
        objname = state.objname
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
273
        logger.warning(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
274
275
            "Marking failed serial %s for archive: %s, object: '%s'" %
            (serial, state.archive, objname))
276
        with self.heartbeat.lock() as hb:
277
278
279
            hb.pop(objname)
        with self.failed_serials.lock() as d:
            d[(serial, objname)] = state
280
281
282
283
284
285
286

    def update_state(self, old_state, new_state):
        db = self.get_db()
        db.put_state(new_state)
        # here we could do any checks needed on the old state,
        # perhaps triggering a probe

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
287
    def ack_file_sync(self, synced_source_state, synced_target_state):
288
        self._ack_file_sync(synced_source_state, synced_target_state)
289
        serial = synced_source_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
290
        objname = synced_source_state.objname
291
        target = synced_target_state.archive
292
        with self.heartbeat.lock() as hb:
293
            hb.pop(objname)
294
295
296
297
        msg = messaging.AckSyncMessage(
            archive=target, objname=objname, serial=serial,
            logger=logger)
        self.messager.put(msg)
298
299
300
301
302
303
304
305

    @transaction()
    def _ack_file_sync(self, synced_source_state, synced_target_state):
        db = self.get_db()
        serial = synced_source_state.serial
        objname = synced_source_state.objname
        source = synced_source_state.archive
        target = synced_target_state.archive
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
306
307
308
309
        tinfo = synced_target_state.info
        logger.info("Acking archive: %s, object: '%s', serial: %s "
                    "info: %s" %
                    (target, objname, serial, tinfo))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
310
311
        decision_state = db.get_state(self.DECISION, objname)
        sync_state = db.get_state(self.SYNC, objname)
312
313
314
315
316
317
318
319
320
321

        if serial != decision_state.serial:
            raise AssertionError(
                "Serial mismatch: assumed sync %s, decision %s"
                % (serial, decision_state.serial))
        if serial <= sync_state.serial:
            raise common.SyncError(
                "cannot ack: serial %s < sync serial %s" %
                (serial, sync_state.serial))

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
322
        db_source_state = db.get_state(source, objname)
323
324
325
326
        self.update_state(db_source_state, synced_source_state)

        final_target_state = synced_target_state.set(
            serial=serial)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
327
        db_target_state = db.get_state(target, objname)
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
        self.update_state(db_target_state, final_target_state)

        sync_info = dict(synced_source_state.info)
        sync_info.update(synced_target_state.info)
        # The 'info' namespace is global. Some attributes may be globally
        # recognizable by all clients with the same semantics, such as
        # a content-hash (e.g. SHA256), while other may be specific to
        # each client. Clients are responsible to protect their private
        # attributes creating their own namespace, for example
        # 'localfs_mtime', 'object_store_etag'
        new_sync_state = sync_state.set(serial=serial, info=sync_info)
        db.put_state(new_sync_state)
        new_decision_state = new_sync_state.set(archive=self.DECISION)
        db.put_state(new_decision_state)

    @transaction()
    def list_deciding(self, archives=None):
        db = self.get_db()
        if archives is None:
            archives = (self.MASTER, self.SLAVE)
348
349
        return set(db.list_deciding(archives=archives,
                                    sync=self.SYNC))
350

351
    @transaction()
352
    def probe_archive(self, archive, forced=False):
353
        client = self.clients[archive]
354
355
        candidates = client.list_candidate_files(forced=forced)
        for objname in candidates:
356
            self._probe_file(archive, objname)
357
358

    def decide_archive(self, archive):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
359
360
        for objname in self.list_deciding([archive]):
            self.decide_file_sync(objname)
361
362
363

    def decide_all_archives(self):
        logger.info("Checking candidates to sync")
364
        self.probe_all()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
365
366
        for objname in self.list_deciding():
            self.decide_file_sync(objname)
367

368
369
370
371
    def probe_all(self, forced=False):
        self.probe_archive(self.MASTER, forced=forced)
        self.probe_archive(self.SLAVE, forced=forced)

372
    def _poll_decide(self, interval=3):
373
374
        class DecideThread(utils.StoppableThread):
            def run_body(this):
375
376
                self.decide_all_archives()
                time.sleep(interval)
377
378
        return utils.start_daemon(DecideThread)

379
380
381
382
383
384
385
386
    # TODO cleanup db of objects deleted in all clients
    # def cleanup(self):
    #     db = self.get_db()
    #     master_deleted = set(db.list_files_with_info(MASTER, {}))
    #     client_deleted = set(db.list_files_with_info(SLAVE, {}))
    #     deleted = master_deleted.intersection(client_deleted)


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
387
def conf(instance, auth_url, auth_token, container, local_root_path, **kwargs):
388
389
390
391
    settings = SyncerSettings(instance=instance,
                              auth_url=auth_url,
                              auth_token=auth_token,
                              container=container,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
392
393
                              local_root_path=local_root_path,
                              **kwargs)
394
395
396
    master = PithosFileClient(settings)
    slave = LocalfsFileClient(settings)
    return FileSyncer(settings, master, slave)