protocol.py 26.2 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16
from wsgiref.simple_server import make_server
17
from ws4py.websocket import WebSocket
18 19 20 21
from ws4py.server.wsgiutils import WebSocketWSGIApplication
from ws4py.server.wsgirefserver import WSGIServer, WebSocketWSGIRequestHandler
from hashlib import sha1
from threading import Thread
22 23
import sqlite3
import time
24
import os
25
import sys
26 27
import json
import logging
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
28
import subprocess
29
from agkyra.syncer import (
30
    syncer, setup, pithos_client, localfs_client, messaging, utils)
31
from agkyra.config import AgkyraConfig, AGKYRA_DIR
32

33 34 35 36 37 38 39 40
if getattr(sys, 'frozen', False):
    # we are running in a |PyInstaller| bundle
    BASEDIR = sys._MEIPASS
    ISFROZEN = True
else:
    # we are running in a normal Python environment
    BASEDIR = os.path.dirname(os.path.realpath(__file__))
    ISFROZEN = False
41

42
RESOURCES = os.path.join(BASEDIR, 'resources')
43

44
LOG = logging.getLogger(__name__)
45
SYNCERS = utils.ThreadSafeDict()
46

47
with open(os.path.join(RESOURCES, 'ui_data/common_en.json')) as f:
48 49
    COMMON = json.load(f)
STATUS = COMMON['STATUS']
50

51

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
def retry_on_locked_db(method, *args, **kwargs):
    """If DB is locked, wait and try again"""
    wait = kwargs.get('wait', 0.2)
    retries = kwargs.get('retries', 2)
    while retries:
        try:
            return method(*args, **kwargs)
        except sqlite3.OperationalError as oe:
            if 'locked' not in '%s' % oe:
                raise
            LOG.debug('%s, retry' % oe)
        time.sleep(wait)
        retries -= 1


67
class SessionHelper(object):
68 69 70
    """Agkyra Helper Server sets a WebSocket server with the Helper protocol
    It also provided methods for running and killing the Helper server
    """
71
    session_timeout = 20
72

73
    def __init__(self, **kwargs):
74
        """Setup the helper server"""
75 76 77 78 79 80
        self.session_db = kwargs.get(
            'session_db', os.path.join(AGKYRA_DIR, 'session.db'))
        self.session_relation = kwargs.get('session_relation', 'heart')

        LOG.debug('Connect to db')
        self.db = sqlite3.connect(self.session_db)
81

82
        retry_on_locked_db(self._init_db_relation)
83

84
    def _init_db_relation(self):
85
        """Create the session relation"""
86 87 88 89 90 91 92
        self.db.execute('BEGIN')
        self.db.execute(
            'CREATE TABLE IF NOT EXISTS %s ('
            'ui_id VARCHAR(256), address text, beat VARCHAR(32)'
            ')' % self.session_relation)
        self.db.commit()

93
    def load_active_session(self):
94 95 96 97
        """Load a session from db"""
        r = self.db.execute('SELECT * FROM %s' % self.session_relation)
        sessions = r.fetchall()
        if sessions:
98 99 100 101
            last, expected_id = sessions[-1], getattr(self, 'ui_id', None)
            if expected_id and last[0] != '%s' % expected_id:
                LOG.debug('Session ID is old')
                return None
102 103 104 105
            now, last_beat = time.time(), float(last[2])
            if now - last_beat < self.session_timeout:
                # Found an active session
                return dict(ui_id=last[0], address=last[1])
106
        LOG.debug('No active sessions found')
107 108
        return None

109
    def create_session(self):
110 111 112 113 114 115 116 117 118 119 120
        """Return the active session or create a new one"""

        def get_session():
                self.db.execute('BEGIN')
                return self.load_active_session()

        session = retry_on_locked_db(get_session)
        if session:
            self.db.rollback()
            return session

121 122
        ui_id = sha1(os.urandom(128)).hexdigest()

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
123
        LOCAL_ADDR = '127.0.0.1'
124
        WebSocketProtocol.ui_id = ui_id
125 126
        WebSocketProtocol.session_db = self.session_db
        WebSocketProtocol.session_relation = self.session_relation
127
        server = make_server(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
128
            LOCAL_ADDR, 0,
129 130 131 132
            server_class=WSGIServer,
            handler_class=WebSocketWSGIRequestHandler,
            app=WebSocketWSGIApplication(handler_cls=WebSocketProtocol))
        server.initialize_websockets_manager()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
133
        address = 'ws://%s:%s' % (LOCAL_ADDR, server.server_port)
134

135 136 137 138
        self.db.execute('INSERT INTO %s VALUES ("%s", "%s", "%s")' % (
            self.session_relation, ui_id, address, time.time()))
        self.db.commit()

139
        self.server = server
140
        self.ui_id = ui_id
141 142
        return dict(ui_id=ui_id, address=address)

143 144 145 146 147 148 149 150 151 152 153 154 155
    def wait_session_to_load(self, timeout=20, step=2):
        """Wait while the session is loading e.g. in another process
            :returns: the session or None if timeout
        """
        time_passed = 0
        while time_passed < timeout:
            self.session = self.load_active_session()
            if self.session:
                return self.session
            time_passed += step
            time.sleep(step)
        return None

156 157 158 159 160 161 162 163 164 165
    def wait_session_to_stop(self, timeout=20, step=2):
        """Wait while the session is shutting down
            :returns: True if stopped, False if timed out and still running
        """
        time_passed = 0
        while time_passed < timeout and self.load_active_session():
            time.sleep(step)
            time_passed += step
        return not bool(self.load_active_session())

166
    def heartbeat(self):
167
        """Periodically update the session database timestamp"""
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
        db, alive = sqlite3.connect(self.session_db), True
        while alive:
            time.sleep(2)
            try:
                db.execute('BEGIN')
                r = db.execute('SELECT ui_id FROM %s WHERE ui_id="%s"' % (
                    self.session_relation, self.ui_id))
                if r.fetchall():
                    db.execute('UPDATE %s SET beat="%s" WHERE ui_id="%s"' % (
                        self.session_relation, time.time(), self.ui_id))
                else:
                    alive = False
                db.commit()
            except sqlite3.OperationalError as oe:
                if 'locked' not in '%s' % oe:
                    raise
        db.close()

186 187
    def start(self):
        """Start the helper server in a thread"""
188
        if getattr(self, 'server', None):
189 190 191
            t = Thread(target=self._shutdown_daemon)
            t.start()
            Thread(target=self.heartbeat).start()
192
            self.server.serve_forever()
193 194
            t.join()
            LOG.debug('WSGI server is down')
195

196
    def _shutdown_daemon(self):
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
        """Shutdown WSGI server when the heart stops"""
        db = sqlite3.connect(self.session_db)
        while True:
            time.sleep(4)
            try:
                r = db.execute('SELECT ui_id FROM %s WHERE ui_id="%s"' % (
                    self.session_relation, self.ui_id))
                if not r.fetchall():
                    db.close()
                    time.sleep(5)
                    t = Thread(target=self.server.shutdown)
                    t.start()
                    t.join()
                    break
            except sqlite3.OperationalError:
                pass
213 214


215 216 217
class WebSocketProtocol(WebSocket):
    """Helper-side WebSocket protocol for communication with GUI:

218
    -- INTERNAL HANDSAKE --
219
    GUI: {"method": "post", "ui_id": <GUI ID>}
220
    HELPER: {"ACCEPTED": 202, "action": "post ui_id"}" or
221
        "{"REJECTED": 401, "action": "post ui_id"}
222

223 224 225 226
    -- ERRORS WITH SIGNIFICANCE --
    If the token doesn't work:
    HELPER: {"action": <action that caused the error>, "UNAUTHORIZED": 401}

227 228 229 230 231 232 233
    -- SHUT DOWN --
    GUI: {"method": "post", "path": "shutdown"}

    -- PAUSE --
    GUI: {"method": "post", "path": "pause"}
    HELPER: {"OK": 200, "action": "post pause"} or error

234
    -- START --
235 236 237
    GUI: {"method": "post", "path": "start"}
    HELPER: {"OK": 200, "action": "post start"} or error

238 239 240 241
    -- FORCE START --
    GUI: {"method": "post", "path": "force"}
    HELPER: {"OK": 200, "action": "post force"} or error

242 243 244 245 246 247 248 249 250
    -- GET SETTINGS --
    GUI: {"method": "get", "path": "settings"}
    HELPER:
        {
            "action": "get settings",
            "token": <user token>,
            "url": <auth url>,
            "container": <container>,
            "directory": <local directory>,
251 252 253
            "exclude": <file path>,
            "language": <en|el>,
            "sync_on_start": <true|false>
254 255 256 257 258 259 260 261 262
        } or {<ERROR>: <ERROR CODE>}

    -- PUT SETTINGS --
    GUI: {
            "method": "put", "path": "settings",
            "token": <user token>,
            "url": <auth url>,
            "container": <container>,
            "directory": <local directory>,
263 264 265
            "exclude": <file path>,
            "language": <en|el>,
            "sync_on_start": <true|false>
266 267 268 269 270 271
        }
    HELPER: {"CREATED": 201, "action": "put settings",} or
        {<ERROR>: <ERROR CODE>, "action": "get settings",}

    -- GET STATUS --
    GUI: {"method": "get", "path": "status"}
272
    HELPER: {"code": <int>,
273
            "synced": <int>, "unsynced": <int>, "failed": <int>,
274 275
            "action": "get status"
        } or {<ERROR>: <ERROR CODE>, "action": "get status"}
276
    """
277 278
    status = utils.ThreadSafeDict()
    with status.lock() as d:
279
        d.update(code=STATUS['UNINITIALIZED'], synced=0, unsynced=0, failed=0)
280

281
    ui_id = None
282
    session_db, session_relation = None, None
283 284 285 286
    accepted = False
    settings = dict(
        token=None, url=None,
        container=None, directory=None,
287
        exclude=None, sync_on_start=True, language="en")
288
    cnf = AgkyraConfig()
289
    essentials = ('url', 'token', 'container', 'directory')
290

291 292 293 294 295 296 297 298
    def get_status(self, key=None):
        """:return: updated status dict or value of specified key"""
        if self.syncer and self.can_sync():
            self._consume_messages()
            with self.status.lock() as d:
                if self.syncer.paused:
                    d['code'] = STATUS['PAUSED']
                elif d['code'] != STATUS['PAUSING'] or (
299
                        d['unsynced'] == d['synced'] + d['failed']):
300 301 302 303 304 305 306 307
                    d['code'] = STATUS['SYNCING']
        with self.status.lock() as d:
            return d.get(key, None) if key else dict(d)

    def set_status(self, **kwargs):
        with self.status.lock() as d:
            d.update(kwargs)

308 309
    @property
    def syncer(self):
310
        """:returns: the first syncer object or None"""
311 312 313 314 315
        with SYNCERS.lock() as d:
            for sync_key, sync_obj in d.items():
                return sync_obj
        return None

316
    def clean_db(self):
317 318
        """Clean DB from current session trace"""
        LOG.debug('Remove current session trace')
319
        db = sqlite3.connect(self.session_db)
320
        db.execute('BEGIN')
321 322
        db.execute('DELETE FROM %s WHERE ui_id="%s"' % (
            self.session_relation, self.ui_id))
323 324 325
        db.commit()
        db.close()

326
    def shutdown_syncer(self, syncer_key=0):
327
        """Shutdown the syncer backend object"""
328 329 330 331 332 333 334 335
        LOG.debug('Shutdown syncer')
        with SYNCERS.lock() as d:
            syncer = d.pop(syncer_key, None)
            if syncer and self.can_sync():
                syncer.stop_all_daemons()
                LOG.debug('Wait open syncs to complete')
                syncer.wait_sync_threads()

336
    def heartbeat(self):
337
        """Update session DB timestamp as long as session is alive"""
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353
        db, alive = sqlite3.connect(self.session_db), True
        while alive:
            time.sleep(1)
            try:
                db.execute('BEGIN')
                r = db.execute('SELECT ui_id FROM %s WHERE ui_id="%s"' % (
                    self.session_relation, self.ui_id))
                if r.fetchall():
                    db.execute('UPDATE %s SET beat="%s" WHERE ui_id="%s"' % (
                        self.session_relation, time.time(), self.ui_id))
                else:
                    alive = False
                db.commit()
            except sqlite3.OperationalError:
                alive = True
        db.close()
354
        self.shutdown_syncer()
355
        self.set_status(code=STATUS['UNINITIALIZED'])
356
        self.close()
357

358 359 360 361
    def _get_default_sync(self):
        """Get global.default_sync or pick the first sync as default
        If there are no syncs, create a 'default' sync.
        """
362
        sync = self.cnf.get('global', 'default_sync')
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383
        if not sync:
            for sync in self.cnf.keys('sync'):
                break
            self.cnf.set('global', 'default_sync', sync or 'default')
        return sync or 'default'

    def _get_sync_cloud(self, sync):
        """Get the <sync>.cloud or pick the first cloud and use it
        In case of cloud picking, set the cloud as the <sync>.cloud for future
        sessions.
        If no clouds are found, create a 'default' cloud, with an empty url.
        """
        try:
            cloud = self.cnf.get_sync(sync, 'cloud')
        except KeyError:
            cloud = None
        if not cloud:
            for cloud in self.cnf.keys('cloud'):
                break
            self.cnf.set_sync(sync, 'cloud', cloud or 'default')
        return cloud or 'default'
384

385 386 387 388
    def _load_settings(self):
        LOG.debug('Start loading settings')
        sync = self._get_default_sync()
        cloud = self._get_sync_cloud(sync)
389 390

        try:
391 392 393
            self.settings['url'] = self.cnf.get_cloud(cloud, 'url')
        except Exception:
            self.settings['url'] = None
394
            self.set_status(code=STATUS['SETTINGS MISSING'])
395 396 397 398
        try:
            self.settings['token'] = self.cnf.get_cloud(cloud, 'token')
        except Exception:
            self.settings['url'] = None
399
            self.set_status(code=STATUS['SETTINGS MISSING'])
400

401 402 403 404
        self.settings['sync_on_start'] = (
            self.cnf.get('global', 'sync_on_start') == 'on')
        self.settings['language'] = self.cnf.get('global', 'language')

405 406
        # for option in ('container', 'directory', 'exclude'):
        for option in ('container', 'directory'):
407 408 409 410
            try:
                self.settings[option] = self.cnf.get_sync(sync, option)
            except KeyError:
                LOG.debug('No %s is set' % option)
411
                self.set_status(code=STATUS['SETTINGS MISSING'])
412

413 414
        LOG.debug('Finished loading settings')

415
    def _dump_settings(self):
416
        LOG.debug('Saving settings')
417
        sync = self._get_default_sync()
418
        changes = False
419

420 421 422 423 424
        if not self.settings.get('url', None):
            LOG.debug('No cloud settings to save')
        else:
            LOG.debug('Save cloud settings')
            cloud = self._get_sync_cloud(sync)
425 426

            try:
427
                old_url = self.cnf.get_cloud(cloud, 'url') or ''
428
            except KeyError:
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444
                old_url = self.settings['url']

            while old_url and old_url != self.settings['url']:
                cloud = '%s_%s' % (cloud, sync)
                try:
                    self.cnf.get_cloud(cloud, 'url')
                except KeyError:
                    break

            LOG.debug('Cloud name is %s' % cloud)
            self.cnf.set_cloud(cloud, 'url', self.settings['url'])
            self.cnf.set_cloud(cloud, 'token', self.settings['token'] or '')
            self.cnf.set_sync(sync, 'cloud', cloud)
            changes = True

        LOG.debug('Save sync settings, name is %s' % sync)
445 446
        # for option in ('directory', 'container', 'exclude'):
        for option in ('directory', 'container'):
447
            self.cnf.set_sync(sync, option, self.settings[option] or '')
448
            changes = True
449

450 451 452 453 454
        self.cnf.set('global', 'language', self.settings.get('language', 'en'))
        sync_on_start = self.settings.get('sync_on_start', False)
        self.cnf.set(
            'global', 'sync_on_start', 'on' if sync_on_start else 'off')

455 456 457 458 459
        if changes:
            self.cnf.write()
            LOG.debug('Settings saved')
        else:
            LOG.debug('No setting changes spotted')
460

461 462 463 464 465
    def _essentials_changed(self, new_settings):
        """Check if essential settings have changed in new_settings"""
        return all([
            self.settings[e] == self.settings[e] for e in self.essentials])

466
    def _consume_messages(self, max_consumption=10):
467
        """Update status by consuming and understanding syncer messages"""
468 469
        if self.can_sync():
            msg = self.syncer.get_next_message()
470 471 472 473
            # if not msg:
            #     with self.status.lock() as d:
            #         if d['unsynced'] == d['synced'] + d['failed']:
            #             d.update(unsynced=0, synced=0, failed=0)
474
            while msg:
475
                if isinstance(msg, messaging.SyncMessage):
476
                    LOG.debug('UNSYNCED +1 %s' % getattr(msg, 'objname', ''))
477
                    self.set_status(unsynced=self.get_status('unsynced') + 1)
478
                elif isinstance(msg, messaging.AckSyncMessage):
479
                    LOG.debug('SYNCED +1 %s' % getattr(msg, 'objname', ''))
480
                    self.set_status(synced=self.get_status('synced') + 1)
481
                elif isinstance(msg, messaging.SyncErrorMessage):
482
                    LOG.debug('FAILED +1 %s' % getattr(msg, 'objname', ''))
483
                    self.set_status(failed=self.get_status('failed') + 1)
484
                elif isinstance(msg, messaging.LocalfsSyncDisabled):
485
                    self.set_status(code=STATUS['DIRECTORY ERROR'])
486 487
                    self.syncer.stop_all_daemons()
                elif isinstance(msg, messaging.PithosSyncDisabled):
488
                    self.set_status(code=STATUS['CONTAINER ERROR'])
489 490
                    self.syncer.stop_all_daemons()
                LOG.debug('Backend message: %s' % msg.name)
491 492
                # Limit the amount of messages consumed each time
                max_consumption -= 1
493
                if max_consumption:
494
                    msg = self.syncer.get_next_message()
495 496
                else:
                    break
497 498 499 500 501

    def can_sync(self):
        """Check if settings are enough to setup a syncing proccess"""
        return all([self.settings[e] for e in self.essentials])

502 503
    def init_sync(self):
        """Initialize syncer"""
504
        self.set_status(code=STATUS['INITIALIZING'])
505
        sync = self._get_default_sync()
506 507 508 509 510 511 512 513 514 515 516 517 518 519 520

        kwargs = dict(agkyra_path=AGKYRA_DIR)
        # Get SSL settings
        cloud = self._get_sync_cloud(sync)
        try:
            ignore_ssl = self.cnf.get_cloud(cloud, 'ignore_ssl') in ('on', )
            kwargs['ignore_ssl'] = ignore_ssl
        except KeyError:
            ignore_ssl = None
        if not ignore_ssl:
            try:
                kwargs['ca_certs'] = self.cnf.get_cloud(cloud, 'ca_certs')
            except KeyError:
                pass

521
        syncer_ = None
522 523 524 525 526 527 528
        try:
            syncer_settings = setup.SyncerSettings(
                self.settings['url'], self.settings['token'],
                self.settings['container'], self.settings['directory'],
                **kwargs)
            master = pithos_client.PithosFileClient(syncer_settings)
            slave = localfs_client.LocalfsFileClient(syncer_settings)
529
            syncer_ = syncer.FileSyncer(syncer_settings, master, slave)
530
            self.syncer_settings = syncer_settings
531 532 533 534 535 536 537 538
            # Check if syncer is ready, by consuming messages
            local_ok, remote_ok = False, False
            for i in range(2):

                LOG.debug('Get message %s' % (i + 1))
                msg = syncer_.get_next_message(block=True)
                LOG.debug('Got message: %s' % msg)

539
                if isinstance(msg, messaging.LocalfsSyncDisabled):
540 541 542
                    self.set_status(code=STATUS['DIRECTORY ERROR'])
                    local_ok = False
                    break
543
                elif isinstance(msg, messaging.PithosSyncDisabled):
544
                    self.set_status(code=STATUS['CONTAINER ERROR'])
545 546 547 548 549 550
                    remote_ok = False
                    break
                elif isinstance(msg, messaging.LocalfsSyncEnabled):
                    local_ok = True
                elif isinstance(msg, messaging.PithosSyncEnabled):
                    remote_ok = True
551
                else:
552 553 554 555
                    LOG.error('Unexpected message %s' % msg)
                    self.set_status(code=STATUS['CRITICAL ERROR'])
                    break
            if local_ok and remote_ok:
556
                syncer_.initiate_probe()
557 558 559
                self.set_status(code=STATUS['SYNCING'])
            else:
                syncer_ = None
560
        finally:
561
            self.set_status(synced=0, unsynced=0)
562 563
            with SYNCERS.lock() as d:
                d[0] = syncer_
564

565 566 567 568 569
    # Syncer-related methods
    def get_settings(self):
        return self.settings

    def set_settings(self, new_settings):
570
        """Set the settings and dump them to permanent storage if needed"""
571
        # Prepare setting save
572
        could_sync = self.syncer and self.can_sync()
573 574 575
        was_active = False
        if could_sync and not self.syncer.paused:
            was_active = True
576 577 578 579
            self.pause_sync()
        must_reset_syncing = self._essentials_changed(new_settings)

        # save settings
580
        self.settings.update(new_settings)
581 582
        self._dump_settings()

583 584 585 586
        # Restart
        if self.can_sync():
            if must_reset_syncing or not could_sync:
                self.init_sync()
587
            if was_active:
588 589
                self.start_sync()

590 591 592
    def _pause_syncer(self):
        syncer_ = self.syncer
        syncer_.stop_decide()
593
        LOG.debug('Wait open syncs to complete')
594 595 596
        syncer_.wait_sync_threads()

    def pause_sync(self):
597
        """Pause syncing (assuming it is up and running)"""
598 599
        if self.syncer:
            self.syncer.stop_decide()
600
            self.set_status(code=STATUS['PAUSING'])
601 602

    def start_sync(self):
603
        """Start syncing"""
604
        self.syncer.start_decide()
605

606 607 608 609 610 611 612 613 614 615 616
    def force_sync(self):
        """Force syncing, assuming there is a directory or container problem"""
        self.set_status(code=STATUS['INITIALIZING'])
        self.syncer_settings.purge_db_archives_and_enable()
        self.init_sync()
        if self.syncer:
            self.syncer.start_decide()
            self.set_status(code=STATUS['SYNCING'])
        else:
            self.set_status(code=STATUS['CRITICAL ERROR'])

617 618 619 620 621 622 623 624 625 626
    def send_json(self, msg):
        LOG.debug('send: %s' % msg)
        self.send(json.dumps(msg))

    # Protocol handling methods
    def _post(self, r):
        """Handle POST requests"""
        if self.accepted:
            action = r['path']
            if action == 'shutdown':
627 628
                # Clean db to cause syncer backend to shut down
                self.set_status(code=STATUS['SHUTTING DOWN'])
629
                retry_on_locked_db(self.clean_db)
630 631
                # self._shutdown()
                # self.terminate()
632 633 634
                return
            {
                'start': self.start_sync,
635 636
                'pause': self.pause_sync,
                'force': self.force_sync
637 638
            }[action]()
            self.send_json({'OK': 200, 'action': 'post %s' % action})
639
        elif r['ui_id'] == self.ui_id:
640
            self.accepted = True
641
            Thread(target=self.heartbeat).start()
642
            self.send_json({'ACCEPTED': 202, 'action': 'post ui_id'})
643
            self._load_settings()
644
            if (not self.syncer) and self.can_sync():
645
                self.init_sync()
646
                if self.syncer and self.settings['sync_on_start']:
647
                    self.start_sync()
648
        else:
649
            action = r.get('path', 'ui_id')
650 651 652 653 654
            self.send_json({'REJECTED': 401, 'action': 'post %s' % action})
            self.terminate()

    def _put(self, r):
        """Handle PUT requests"""
655
        if self.accepted:
656 657 658 659 660
            LOG.debug('put %s' % r)
            action = r.pop('path')
            self.set_settings(r)
            r.update({'CREATED': 201, 'action': 'put %s' % action})
            self.send_json(r)
661 662
        else:
            action = r['path']
663 664
            self.send_json({
                'UNAUTHORIZED UI': 401, 'action': 'put %s' % action})
665
            self.terminate()
666 667 668 669 670

    def _get(self, r):
        """Handle GET requests"""
        action = r.pop('path')
        if not self.accepted:
671 672
            self.send_json({
                'UNAUTHORIZED UI': 401, 'action': 'get %s' % action})
673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698
            self.terminate()
        else:
            data = {
                'settings': self.get_settings,
                'status': self.get_status,
            }[action]()
            data['action'] = 'get %s' % action
            self.send_json(data)

    def received_message(self, message):
        """Route requests to corresponding handling methods"""
        LOG.debug('recv: %s' % message)
        try:
            r = json.loads('%s' % message)
        except ValueError as ve:
            self.send_json({'BAD REQUEST': 400})
            LOG.error('JSON ERROR: %s' % ve)
            return
        try:
            method = r.pop('method')
            {
                'post': self._post,
                'put': self._put,
                'get': self._get
            }[method](r)
        except KeyError as ke:
699 700
            action = method + ' ' + r.get('path', '')
            self.send_json({'BAD REQUEST': 400, 'action': action})
701
            LOG.error('KEY ERROR: %s' % ke)
702 703 704 705 706
        except setup.ClientError as ce:
            action = '%s %s' % (
                method, r.get('path', 'ui_id' if 'ui_id' in r else ''))
            self.send_json({'%s' % ce: ce.status, 'action': action})
            return
707 708
        except Exception as e:
            self.send_json({'INTERNAL ERROR': 500})
709 710
            reason = '%s %s' % (method or '', r)
            LOG.error('EXCEPTION (%s): %s' % (reason, e))
711
            self.terminate()
712 713


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
714
def launch_server(callback, debug):
715 716
    """Launch the server in a separate process"""
    LOG.info('Start SessionHelper session')
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
717
    if utils.iswin():
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
718 719 720 721 722
        command = [callback]
        if debug:
            command.append('-d')
        command.append("server")
        subprocess.Popen(command,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
723 724 725 726
                         close_fds=True)
    else:
        pid = os.fork()
        if not pid:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
727 728 729 730 731
            command = [callback, callback]
            if debug:
                command.append('-d')
            command.append("server")
            os.execlp(*command)