protocol.py 26.8 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16
from wsgiref.simple_server import make_server
17
from ws4py.websocket import WebSocket
18 19 20 21
from ws4py.server.wsgiutils import WebSocketWSGIApplication
from ws4py.server.wsgirefserver import WSGIServer, WebSocketWSGIRequestHandler
from hashlib import sha1
from threading import Thread
22 23
import sqlite3
import time
24
import os
25
import sys
26 27
import json
import logging
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
28
import subprocess
29
from agkyra.syncer import (
30
    syncer, setup, pithos_client, localfs_client, messaging, utils)
31
from agkyra.config import AgkyraConfig, AGKYRA_DIR
32

33 34 35 36 37 38 39 40
if getattr(sys, 'frozen', False):
    # we are running in a |PyInstaller| bundle
    BASEDIR = sys._MEIPASS
    ISFROZEN = True
else:
    # we are running in a normal Python environment
    BASEDIR = os.path.dirname(os.path.realpath(__file__))
    ISFROZEN = False
41

42
RESOURCES = os.path.join(BASEDIR, 'resources')
43

44
LOG = logging.getLogger(__name__)
45
SYNCERS = utils.ThreadSafeDict()
46

47
with open(os.path.join(RESOURCES, 'ui_data/common_en.json')) as f:
48 49
    COMMON = json.load(f)
STATUS = COMMON['STATUS']
50

51

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
def retry_on_locked_db(method, *args, **kwargs):
    """If DB is locked, wait and try again"""
    wait = kwargs.get('wait', 0.2)
    retries = kwargs.get('retries', 2)
    while retries:
        try:
            return method(*args, **kwargs)
        except sqlite3.OperationalError as oe:
            if 'locked' not in '%s' % oe:
                raise
            LOG.debug('%s, retry' % oe)
        time.sleep(wait)
        retries -= 1


67
class SessionHelper(object):
68 69 70
    """Agkyra Helper Server sets a WebSocket server with the Helper protocol
    It also provided methods for running and killing the Helper server
    """
71
    session_timeout = 20
72

73
    def __init__(self, **kwargs):
74
        """Setup the helper server"""
75 76 77 78 79 80
        self.session_db = kwargs.get(
            'session_db', os.path.join(AGKYRA_DIR, 'session.db'))
        self.session_relation = kwargs.get('session_relation', 'heart')

        LOG.debug('Connect to db')
        self.db = sqlite3.connect(self.session_db)
81

82
        retry_on_locked_db(self._init_db_relation)
83

84
    def _init_db_relation(self):
85
        """Create the session relation"""
86 87 88 89 90 91 92
        self.db.execute('BEGIN')
        self.db.execute(
            'CREATE TABLE IF NOT EXISTS %s ('
            'ui_id VARCHAR(256), address text, beat VARCHAR(32)'
            ')' % self.session_relation)
        self.db.commit()

93
    def load_active_session(self):
94 95 96 97
        """Load a session from db"""
        r = self.db.execute('SELECT * FROM %s' % self.session_relation)
        sessions = r.fetchall()
        if sessions:
98 99 100 101
            last, expected_id = sessions[-1], getattr(self, 'ui_id', None)
            if expected_id and last[0] != '%s' % expected_id:
                LOG.debug('Session ID is old')
                return None
102 103 104 105
            now, last_beat = time.time(), float(last[2])
            if now - last_beat < self.session_timeout:
                # Found an active session
                return dict(ui_id=last[0], address=last[1])
106
        LOG.debug('No active sessions found')
107 108
        return None

109
    def create_session(self):
110 111 112 113 114 115 116 117 118 119 120
        """Return the active session or create a new one"""

        def get_session():
                self.db.execute('BEGIN')
                return self.load_active_session()

        session = retry_on_locked_db(get_session)
        if session:
            self.db.rollback()
            return session

121 122
        ui_id = sha1(os.urandom(128)).hexdigest()

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
123
        LOCAL_ADDR = '127.0.0.1'
124
        WebSocketProtocol.ui_id = ui_id
125 126
        WebSocketProtocol.session_db = self.session_db
        WebSocketProtocol.session_relation = self.session_relation
127
        server = make_server(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
128
            LOCAL_ADDR, 0,
129 130 131 132
            server_class=WSGIServer,
            handler_class=WebSocketWSGIRequestHandler,
            app=WebSocketWSGIApplication(handler_cls=WebSocketProtocol))
        server.initialize_websockets_manager()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
133
        address = 'ws://%s:%s' % (LOCAL_ADDR, server.server_port)
134

135 136 137 138
        self.db.execute('INSERT INTO %s VALUES ("%s", "%s", "%s")' % (
            self.session_relation, ui_id, address, time.time()))
        self.db.commit()

139
        self.server = server
140
        self.ui_id = ui_id
141 142
        return dict(ui_id=ui_id, address=address)

143 144 145 146 147 148 149 150 151 152 153 154 155
    def wait_session_to_load(self, timeout=20, step=2):
        """Wait while the session is loading e.g. in another process
            :returns: the session or None if timeout
        """
        time_passed = 0
        while time_passed < timeout:
            self.session = self.load_active_session()
            if self.session:
                return self.session
            time_passed += step
            time.sleep(step)
        return None

156 157 158 159 160 161 162 163 164 165
    def wait_session_to_stop(self, timeout=20, step=2):
        """Wait while the session is shutting down
            :returns: True if stopped, False if timed out and still running
        """
        time_passed = 0
        while time_passed < timeout and self.load_active_session():
            time.sleep(step)
            time_passed += step
        return not bool(self.load_active_session())

166
    def heartbeat(self):
167
        """Periodically update the session database timestamp"""
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
        db, alive = sqlite3.connect(self.session_db), True
        while alive:
            time.sleep(2)
            try:
                db.execute('BEGIN')
                r = db.execute('SELECT ui_id FROM %s WHERE ui_id="%s"' % (
                    self.session_relation, self.ui_id))
                if r.fetchall():
                    db.execute('UPDATE %s SET beat="%s" WHERE ui_id="%s"' % (
                        self.session_relation, time.time(), self.ui_id))
                else:
                    alive = False
                db.commit()
            except sqlite3.OperationalError as oe:
                if 'locked' not in '%s' % oe:
                    raise
        db.close()

186 187
    def start(self):
        """Start the helper server in a thread"""
188
        if getattr(self, 'server', None):
189 190 191
            t = Thread(target=self._shutdown_daemon)
            t.start()
            Thread(target=self.heartbeat).start()
192
            self.server.serve_forever()
193 194
            t.join()
            LOG.debug('WSGI server is down')
195

196
    def _shutdown_daemon(self):
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
        """Shutdown WSGI server when the heart stops"""
        db = sqlite3.connect(self.session_db)
        while True:
            time.sleep(4)
            try:
                r = db.execute('SELECT ui_id FROM %s WHERE ui_id="%s"' % (
                    self.session_relation, self.ui_id))
                if not r.fetchall():
                    db.close()
                    time.sleep(5)
                    t = Thread(target=self.server.shutdown)
                    t.start()
                    t.join()
                    break
            except sqlite3.OperationalError:
                pass
213 214


215 216 217



218 219 220
class WebSocketProtocol(WebSocket):
    """Helper-side WebSocket protocol for communication with GUI:

221
    -- INTERNAL HANDSAKE --
222
    GUI: {"method": "post", "ui_id": <GUI ID>}
223
    HELPER: {"ACCEPTED": 202, "action": "post ui_id"}" or
224
        "{"REJECTED": 401, "action": "post ui_id"}
225

226 227 228 229
    -- ERRORS WITH SIGNIFICANCE --
    If the token doesn't work:
    HELPER: {"action": <action that caused the error>, "UNAUTHORIZED": 401}

230 231 232 233 234 235 236
    -- SHUT DOWN --
    GUI: {"method": "post", "path": "shutdown"}

    -- PAUSE --
    GUI: {"method": "post", "path": "pause"}
    HELPER: {"OK": 200, "action": "post pause"} or error

237
    -- START --
238 239 240
    GUI: {"method": "post", "path": "start"}
    HELPER: {"OK": 200, "action": "post start"} or error

241 242 243 244
    -- FORCE START --
    GUI: {"method": "post", "path": "force"}
    HELPER: {"OK": 200, "action": "post force"} or error

245 246 247 248 249 250 251 252 253
    -- GET SETTINGS --
    GUI: {"method": "get", "path": "settings"}
    HELPER:
        {
            "action": "get settings",
            "token": <user token>,
            "url": <auth url>,
            "container": <container>,
            "directory": <local directory>,
254 255 256
            "exclude": <file path>,
            "language": <en|el>,
            "sync_on_start": <true|false>
257 258 259 260 261 262 263 264 265
        } or {<ERROR>: <ERROR CODE>}

    -- PUT SETTINGS --
    GUI: {
            "method": "put", "path": "settings",
            "token": <user token>,
            "url": <auth url>,
            "container": <container>,
            "directory": <local directory>,
266 267 268
            "exclude": <file path>,
            "language": <en|el>,
            "sync_on_start": <true|false>
269 270 271 272 273 274
        }
    HELPER: {"CREATED": 201, "action": "put settings",} or
        {<ERROR>: <ERROR CODE>, "action": "get settings",}

    -- GET STATUS --
    GUI: {"method": "get", "path": "status"}
275
    HELPER: {"code": <int>,
276
            "synced": <int>, "unsynced": <int>, "failed": <int>,
277 278
            "action": "get status"
        } or {<ERROR>: <ERROR CODE>, "action": "get status"}
279
    """
280 281
    status = utils.ThreadSafeDict()
    with status.lock() as d:
282
        d.update(code=STATUS['UNINITIALIZED'], synced=0, unsynced=0, failed=0)
283

284
    ui_id = None
285
    session_db, session_relation = None, None
286 287 288 289
    accepted = False
    settings = dict(
        token=None, url=None,
        container=None, directory=None,
290
        exclude=None, sync_on_start=True, language="en")
291
    cnf = AgkyraConfig()
292
    essentials = ('url', 'token', 'container', 'directory')
293

294 295 296 297 298
    def get_status(self, key=None):
        """:return: updated status dict or value of specified key"""
        if self.syncer and self.can_sync():
            self._consume_messages()
            with self.status.lock() as d:
299 300 301 302 303 304 305 306
                LOG.debug('Status was %s' % d['code'])
                if d['code'] in (
                        STATUS['UNINITIALIZED'], STATUS['INITIALIZING']):
                    if self.syncer.paused:
                        d['code'] = STATUS['PAUSED']
                    elif d['code'] != STATUS['PAUSING'] or (
                            d['unsynced'] == d['synced'] + d['failed']):
                        d['code'] = STATUS['SYNCING']
307
        with self.status.lock() as d:
308
            LOG.debug('Status is now %s' % d['code'])
309 310 311 312
            return d.get(key, None) if key else dict(d)

    def set_status(self, **kwargs):
        with self.status.lock() as d:
313
            LOG.debug('CHANGING STATUS TO %s' % kwargs)
314 315
            d.update(kwargs)

316 317
    @property
    def syncer(self):
318
        """:returns: the first syncer object or None"""
319 320 321 322 323
        with SYNCERS.lock() as d:
            for sync_key, sync_obj in d.items():
                return sync_obj
        return None

324
    def clean_db(self):
325 326
        """Clean DB from current session trace"""
        LOG.debug('Remove current session trace')
327
        db = sqlite3.connect(self.session_db)
328
        db.execute('BEGIN')
329 330
        db.execute('DELETE FROM %s WHERE ui_id="%s"' % (
            self.session_relation, self.ui_id))
331 332 333
        db.commit()
        db.close()

334
    def shutdown_syncer(self, syncer_key=0):
335
        """Shutdown the syncer backend object"""
336 337 338 339 340 341 342 343
        LOG.debug('Shutdown syncer')
        with SYNCERS.lock() as d:
            syncer = d.pop(syncer_key, None)
            if syncer and self.can_sync():
                syncer.stop_all_daemons()
                LOG.debug('Wait open syncs to complete')
                syncer.wait_sync_threads()

344
    def heartbeat(self):
345
        """Update session DB timestamp as long as session is alive"""
346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361
        db, alive = sqlite3.connect(self.session_db), True
        while alive:
            time.sleep(1)
            try:
                db.execute('BEGIN')
                r = db.execute('SELECT ui_id FROM %s WHERE ui_id="%s"' % (
                    self.session_relation, self.ui_id))
                if r.fetchall():
                    db.execute('UPDATE %s SET beat="%s" WHERE ui_id="%s"' % (
                        self.session_relation, time.time(), self.ui_id))
                else:
                    alive = False
                db.commit()
            except sqlite3.OperationalError:
                alive = True
        db.close()
362
        self.shutdown_syncer()
363
        self.set_status(code=STATUS['UNINITIALIZED'])
364
        self.close()
365

366 367 368 369
    def _get_default_sync(self):
        """Get global.default_sync or pick the first sync as default
        If there are no syncs, create a 'default' sync.
        """
370
        sync = self.cnf.get('global', 'default_sync')
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391
        if not sync:
            for sync in self.cnf.keys('sync'):
                break
            self.cnf.set('global', 'default_sync', sync or 'default')
        return sync or 'default'

    def _get_sync_cloud(self, sync):
        """Get the <sync>.cloud or pick the first cloud and use it
        In case of cloud picking, set the cloud as the <sync>.cloud for future
        sessions.
        If no clouds are found, create a 'default' cloud, with an empty url.
        """
        try:
            cloud = self.cnf.get_sync(sync, 'cloud')
        except KeyError:
            cloud = None
        if not cloud:
            for cloud in self.cnf.keys('cloud'):
                break
            self.cnf.set_sync(sync, 'cloud', cloud or 'default')
        return cloud or 'default'
392

393 394 395 396
    def _load_settings(self):
        LOG.debug('Start loading settings')
        sync = self._get_default_sync()
        cloud = self._get_sync_cloud(sync)
397 398

        try:
399 400 401
            self.settings['url'] = self.cnf.get_cloud(cloud, 'url')
        except Exception:
            self.settings['url'] = None
402
            self.set_status(code=STATUS['SETTINGS MISSING'])
403 404 405 406
        try:
            self.settings['token'] = self.cnf.get_cloud(cloud, 'token')
        except Exception:
            self.settings['url'] = None
407
            self.set_status(code=STATUS['SETTINGS MISSING'])
408

409 410 411 412
        self.settings['sync_on_start'] = (
            self.cnf.get('global', 'sync_on_start') == 'on')
        self.settings['language'] = self.cnf.get('global', 'language')

413 414
        # for option in ('container', 'directory', 'exclude'):
        for option in ('container', 'directory'):
415 416 417 418
            try:
                self.settings[option] = self.cnf.get_sync(sync, option)
            except KeyError:
                LOG.debug('No %s is set' % option)
419
                self.set_status(code=STATUS['SETTINGS MISSING'])
420

421 422
        LOG.debug('Finished loading settings')

423
    def _dump_settings(self):
424
        LOG.debug('Saving settings')
425
        sync = self._get_default_sync()
426
        changes = False
427

428 429 430 431 432
        if not self.settings.get('url', None):
            LOG.debug('No cloud settings to save')
        else:
            LOG.debug('Save cloud settings')
            cloud = self._get_sync_cloud(sync)
433 434

            try:
435
                old_url = self.cnf.get_cloud(cloud, 'url') or ''
436
            except KeyError:
437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452
                old_url = self.settings['url']

            while old_url and old_url != self.settings['url']:
                cloud = '%s_%s' % (cloud, sync)
                try:
                    self.cnf.get_cloud(cloud, 'url')
                except KeyError:
                    break

            LOG.debug('Cloud name is %s' % cloud)
            self.cnf.set_cloud(cloud, 'url', self.settings['url'])
            self.cnf.set_cloud(cloud, 'token', self.settings['token'] or '')
            self.cnf.set_sync(sync, 'cloud', cloud)
            changes = True

        LOG.debug('Save sync settings, name is %s' % sync)
453 454
        # for option in ('directory', 'container', 'exclude'):
        for option in ('directory', 'container'):
455
            self.cnf.set_sync(sync, option, self.settings[option] or '')
456
            changes = True
457

458 459 460 461 462
        self.cnf.set('global', 'language', self.settings.get('language', 'en'))
        sync_on_start = self.settings.get('sync_on_start', False)
        self.cnf.set(
            'global', 'sync_on_start', 'on' if sync_on_start else 'off')

463 464 465 466 467
        if changes:
            self.cnf.write()
            LOG.debug('Settings saved')
        else:
            LOG.debug('No setting changes spotted')
468

469 470 471 472 473
    def _essentials_changed(self, new_settings):
        """Check if essential settings have changed in new_settings"""
        return all([
            self.settings[e] == self.settings[e] for e in self.essentials])

474
    def _consume_messages(self, max_consumption=10):
475
        """Update status by consuming and understanding syncer messages"""
476 477
        if self.can_sync():
            msg = self.syncer.get_next_message()
478 479 480 481
            # if not msg:
            #     with self.status.lock() as d:
            #         if d['unsynced'] == d['synced'] + d['failed']:
            #             d.update(unsynced=0, synced=0, failed=0)
482
            while msg:
483
                if isinstance(msg, messaging.SyncMessage):
484
                    LOG.debug('UNSYNCED +1 %s' % getattr(msg, 'objname', ''))
485
                    self.set_status(unsynced=self.get_status('unsynced') + 1)
486
                elif isinstance(msg, messaging.AckSyncMessage):
487
                    LOG.debug('SYNCED +1 %s' % getattr(msg, 'objname', ''))
488
                    self.set_status(synced=self.get_status('synced') + 1)
489
                elif isinstance(msg, messaging.SyncErrorMessage):
490
                    LOG.debug('FAILED +1 %s' % getattr(msg, 'objname', ''))
491
                    self.set_status(failed=self.get_status('failed') + 1)
492
                elif isinstance(msg, messaging.LocalfsSyncDisabled):
493 494
                    LOG.debug('STOP BACKEND, %s'% getattr(msg, 'objname', ''))
                    LOG.debug('CHANGE STATUS TO: %s' % STATUS['DIRECTORY ERROR'])
495
                    self.set_status(code=STATUS['DIRECTORY ERROR'])
496 497
                    self.syncer.stop_all_daemons()
                elif isinstance(msg, messaging.PithosSyncDisabled):
498
                    LOG.debug('STOP BACKEND, %s'% getattr(msg, 'objname', ''))
499
                    self.set_status(code=STATUS['CONTAINER ERROR'])
500
                    self.syncer.stop_all_daemons()
501
                LOG.debug('Backend message: %s %s' % (msg.name, type(msg)))
502 503
                # Limit the amount of messages consumed each time
                max_consumption -= 1
504
                if max_consumption:
505
                    msg = self.syncer.get_next_message()
506 507
                else:
                    break
508 509 510 511 512

    def can_sync(self):
        """Check if settings are enough to setup a syncing proccess"""
        return all([self.settings[e] for e in self.essentials])

513 514
    def init_sync(self):
        """Initialize syncer"""
515
        self.set_status(code=STATUS['INITIALIZING'])
516
        sync = self._get_default_sync()
517 518 519 520 521 522 523 524 525 526 527 528 529 530 531

        kwargs = dict(agkyra_path=AGKYRA_DIR)
        # Get SSL settings
        cloud = self._get_sync_cloud(sync)
        try:
            ignore_ssl = self.cnf.get_cloud(cloud, 'ignore_ssl') in ('on', )
            kwargs['ignore_ssl'] = ignore_ssl
        except KeyError:
            ignore_ssl = None
        if not ignore_ssl:
            try:
                kwargs['ca_certs'] = self.cnf.get_cloud(cloud, 'ca_certs')
            except KeyError:
                pass

532
        syncer_ = None
533 534 535 536 537 538 539
        try:
            syncer_settings = setup.SyncerSettings(
                self.settings['url'], self.settings['token'],
                self.settings['container'], self.settings['directory'],
                **kwargs)
            master = pithos_client.PithosFileClient(syncer_settings)
            slave = localfs_client.LocalfsFileClient(syncer_settings)
540
            syncer_ = syncer.FileSyncer(syncer_settings, master, slave)
541
            self.syncer_settings = syncer_settings
542 543 544 545 546 547 548 549
            # Check if syncer is ready, by consuming messages
            local_ok, remote_ok = False, False
            for i in range(2):

                LOG.debug('Get message %s' % (i + 1))
                msg = syncer_.get_next_message(block=True)
                LOG.debug('Got message: %s' % msg)

550
                if isinstance(msg, messaging.LocalfsSyncDisabled):
551 552 553
                    self.set_status(code=STATUS['DIRECTORY ERROR'])
                    local_ok = False
                    break
554
                elif isinstance(msg, messaging.PithosSyncDisabled):
555
                    self.set_status(code=STATUS['CONTAINER ERROR'])
556 557 558 559 560 561
                    remote_ok = False
                    break
                elif isinstance(msg, messaging.LocalfsSyncEnabled):
                    local_ok = True
                elif isinstance(msg, messaging.PithosSyncEnabled):
                    remote_ok = True
562
                else:
563 564 565 566
                    LOG.error('Unexpected message %s' % msg)
                    self.set_status(code=STATUS['CRITICAL ERROR'])
                    break
            if local_ok and remote_ok:
567
                syncer_.initiate_probe()
568 569 570
                self.set_status(code=STATUS['SYNCING'])
            else:
                syncer_ = None
571
        finally:
572
            self.set_status(synced=0, unsynced=0)
573 574
            with SYNCERS.lock() as d:
                d[0] = syncer_
575

576 577 578 579 580
    # Syncer-related methods
    def get_settings(self):
        return self.settings

    def set_settings(self, new_settings):
581
        """Set the settings and dump them to permanent storage if needed"""
582
        # Prepare setting save
583
        could_sync = self.syncer and self.can_sync()
584 585 586
        was_active = False
        if could_sync and not self.syncer.paused:
            was_active = True
587 588 589 590
            self.pause_sync()
        must_reset_syncing = self._essentials_changed(new_settings)

        # save settings
591
        self.settings.update(new_settings)
592 593
        self._dump_settings()

594 595 596 597
        # Restart
        if self.can_sync():
            if must_reset_syncing or not could_sync:
                self.init_sync()
598
            if was_active:
599 600
                self.start_sync()

601 602 603
    def _pause_syncer(self):
        syncer_ = self.syncer
        syncer_.stop_decide()
604
        LOG.debug('Wait open syncs to complete')
605 606 607
        syncer_.wait_sync_threads()

    def pause_sync(self):
608
        """Pause syncing (assuming it is up and running)"""
609
        if self.syncer:
610
            self.set_status(code=STATUS['PAUSING'])
611 612
            self.syncer.stop_decide()
            self.set_status(code=STATUS['PAUSED'])
613 614

    def start_sync(self):
615
        """Start syncing"""
616
        self.syncer.start_decide()
617
        self.set_status(code=STATUS['SYNCING'])
618

619 620 621 622 623 624 625 626 627 628 629
    def force_sync(self):
        """Force syncing, assuming there is a directory or container problem"""
        self.set_status(code=STATUS['INITIALIZING'])
        self.syncer_settings.purge_db_archives_and_enable()
        self.init_sync()
        if self.syncer:
            self.syncer.start_decide()
            self.set_status(code=STATUS['SYNCING'])
        else:
            self.set_status(code=STATUS['CRITICAL ERROR'])

630 631 632 633 634 635 636 637 638 639
    def send_json(self, msg):
        LOG.debug('send: %s' % msg)
        self.send(json.dumps(msg))

    # Protocol handling methods
    def _post(self, r):
        """Handle POST requests"""
        if self.accepted:
            action = r['path']
            if action == 'shutdown':
640 641
                # Clean db to cause syncer backend to shut down
                self.set_status(code=STATUS['SHUTTING DOWN'])
642
                retry_on_locked_db(self.clean_db)
643 644
                # self._shutdown()
                # self.terminate()
645 646 647
                return
            {
                'start': self.start_sync,
648 649
                'pause': self.pause_sync,
                'force': self.force_sync
650 651
            }[action]()
            self.send_json({'OK': 200, 'action': 'post %s' % action})
652
        elif r['ui_id'] == self.ui_id:
653
            self.accepted = True
654
            Thread(target=self.heartbeat).start()
655
            self.send_json({'ACCEPTED': 202, 'action': 'post ui_id'})
656
            self._load_settings()
657
            if (not self.syncer) and self.can_sync():
658
                self.init_sync()
659
                if self.syncer and self.settings['sync_on_start']:
660
                    self.start_sync()
661
        else:
662
            action = r.get('path', 'ui_id')
663 664 665 666 667
            self.send_json({'REJECTED': 401, 'action': 'post %s' % action})
            self.terminate()

    def _put(self, r):
        """Handle PUT requests"""
668
        if self.accepted:
669 670 671 672 673
            LOG.debug('put %s' % r)
            action = r.pop('path')
            self.set_settings(r)
            r.update({'CREATED': 201, 'action': 'put %s' % action})
            self.send_json(r)
674 675
        else:
            action = r['path']
676 677
            self.send_json({
                'UNAUTHORIZED UI': 401, 'action': 'put %s' % action})
678
            self.terminate()
679 680 681 682 683

    def _get(self, r):
        """Handle GET requests"""
        action = r.pop('path')
        if not self.accepted:
684 685
            self.send_json({
                'UNAUTHORIZED UI': 401, 'action': 'get %s' % action})
686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711
            self.terminate()
        else:
            data = {
                'settings': self.get_settings,
                'status': self.get_status,
            }[action]()
            data['action'] = 'get %s' % action
            self.send_json(data)

    def received_message(self, message):
        """Route requests to corresponding handling methods"""
        LOG.debug('recv: %s' % message)
        try:
            r = json.loads('%s' % message)
        except ValueError as ve:
            self.send_json({'BAD REQUEST': 400})
            LOG.error('JSON ERROR: %s' % ve)
            return
        try:
            method = r.pop('method')
            {
                'post': self._post,
                'put': self._put,
                'get': self._get
            }[method](r)
        except KeyError as ke:
712 713
            action = method + ' ' + r.get('path', '')
            self.send_json({'BAD REQUEST': 400, 'action': action})
714
            LOG.error('KEY ERROR: %s' % ke)
715 716 717 718 719
        except setup.ClientError as ce:
            action = '%s %s' % (
                method, r.get('path', 'ui_id' if 'ui_id' in r else ''))
            self.send_json({'%s' % ce: ce.status, 'action': action})
            return
720 721
        except Exception as e:
            self.send_json({'INTERNAL ERROR': 500})
722 723
            reason = '%s %s' % (method or '', r)
            LOG.error('EXCEPTION (%s): %s' % (reason, e))
724
            self.terminate()
725 726


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
727
def launch_server(callback, debug):
728 729
    """Launch the server in a separate process"""
    LOG.info('Start SessionHelper session')
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
730
    if utils.iswin():
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
731 732 733
        command = [callback]
        if debug:
            command.append('-d')
734 735
        command.append("start daemon")
        subprocess.Popen(command, close_fds=True)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
736 737 738
    else:
        pid = os.fork()
        if not pid:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
739 740 741
            command = [callback, callback]
            if debug:
                command.append('-d')
742
            command.append("start daemon")
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
743
            os.execlp(*command)