protocol.py 25.6 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16
from wsgiref.simple_server import make_server
17
from ws4py.websocket import WebSocket
18 19 20 21
from ws4py.server.wsgiutils import WebSocketWSGIApplication
from ws4py.server.wsgirefserver import WSGIServer, WebSocketWSGIRequestHandler
from hashlib import sha1
from threading import Thread
22
import time
23
import os
24
import sys
25 26
import json
import logging
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
27
import subprocess
28
from agkyra.syncer import (
29 30
    syncer, setup, pithos_client, localfs_client, messaging, utils, database,
    common)
31
from agkyra.config import AgkyraConfig, AGKYRA_DIR
32

33 34 35 36 37 38 39 40
if getattr(sys, 'frozen', False):
    # we are running in a |PyInstaller| bundle
    BASEDIR = sys._MEIPASS
    ISFROZEN = True
else:
    # we are running in a normal Python environment
    BASEDIR = os.path.dirname(os.path.realpath(__file__))
    ISFROZEN = False
41

42
RESOURCES = os.path.join(BASEDIR, 'resources')
43

44
LOGGER = logging.getLogger(__name__)
45
SYNCERS = utils.ThreadSafeDict()
46

47
with open(os.path.join(RESOURCES, 'ui_data/common_en.json')) as f:
48 49
    COMMON = json.load(f)
STATUS = COMMON['STATUS']
50

51

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
class SessionDB(database.DB):
    def init(self):
        db = self.db
        db.execute(
            'CREATE TABLE IF NOT EXISTS heart ('
            'ui_id VARCHAR(256), address text, beat VARCHAR(32)'
            ')')

    def get_all_heartbeats(self):
        db = self.db
        r = db.execute('SELECT * FROM heart')
        return r.fetchall()

    def register_heartbeat(self, ui_id, address):
        db = self.db
        db.execute('INSERT INTO heart VALUES (?, ?, ?)',
                   (ui_id, address, time.time()))

    def update_heartbeat(self, ui_id):
        db = self.db
        r = db.execute('SELECT ui_id FROM heart WHERE ui_id=?', (ui_id,))
        if r.fetchall():
            db.execute('UPDATE heart SET beat=? WHERE ui_id=?',
                       (time.time(), ui_id))
            return True
        return False

    def unregister_heartbeat(self, ui_id):
        db = self.db
        db.execute('DELETE FROM heart WHERE ui_id=?', (ui_id,))
82 83


84
class SessionHelper(object):
85 86
    """Enables creation of a session daemon and retrieves credentials of an
    existing one
87
    """
88
    session_timeout = 20
89

90
    def __init__(self, **kwargs):
91
        """Setup the helper server"""
92
        db_name = kwargs.get(
93
            'session_db', os.path.join(AGKYRA_DIR, 'session.db'))
94 95
        self.session_db = common.DBTuple(dbtype=SessionDB, dbname=db_name)
        database.initialize(self.session_db)
96

97
    def load_active_session(self):
98 99 100 101
        with database.TransactedConnection(self.session_db) as db:
            return self._load_active_session(db)

    def _load_active_session(self, db):
102
        """Load a session from db"""
103
        sessions = db.get_all_heartbeats()
104
        if sessions:
105 106
            last, expected_id = sessions[-1], getattr(self, 'ui_id', None)
            if expected_id and last[0] != '%s' % expected_id:
107
                LOGGER.debug('Session ID is old')
108
                return None
109 110 111 112
            now, last_beat = time.time(), float(last[2])
            if now - last_beat < self.session_timeout:
                # Found an active session
                return dict(ui_id=last[0], address=last[1])
113
        LOGGER.debug('No active sessions found')
114 115
        return None

116 117
    def create_session_daemon(self):
        """Create and return a new daemon, or None if one exists"""
118 119 120 121 122 123 124
        with database.TransactedConnection(self.session_db) as db:
            session = self._load_active_session(db)
            if session:
                return None
            session_daemon = SessionDaemon(self.session_db)
            db.register_heartbeat(session_daemon.ui_id, session_daemon.address)
            return session_daemon
125

126
    def wait_session_to_load(self, timeout=20, step=0.2):
127 128 129 130 131 132 133 134 135 136 137 138
        """Wait while the session is loading e.g. in another process
            :returns: the session or None if timeout
        """
        time_passed = 0
        while time_passed < timeout:
            self.session = self.load_active_session()
            if self.session:
                return self.session
            time_passed += step
            time.sleep(step)
        return None

139 140 141 142 143 144 145 146 147 148
    def wait_session_to_stop(self, timeout=20, step=2):
        """Wait while the session is shutting down
            :returns: True if stopped, False if timed out and still running
        """
        time_passed = 0
        while time_passed < timeout and self.load_active_session():
            time.sleep(step)
            time_passed += step
        return not bool(self.load_active_session())

149 150 151 152 153

class SessionDaemon(object):
    """A WebSocket server which inspects a heartbeat and decides whether to
    shut down
    """
154
    def __init__(self, session_db, *args, **kwargs):
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
        self.session_db = session_db
        ui_id = sha1(os.urandom(128)).hexdigest()

        LOCAL_ADDR = '127.0.0.1'
        WebSocketProtocol.ui_id = ui_id
        WebSocketProtocol.session_db = session_db
        server = make_server(
            LOCAL_ADDR, 0,
            server_class=WSGIServer,
            handler_class=WebSocketWSGIRequestHandler,
            app=WebSocketWSGIApplication(handler_cls=WebSocketProtocol))
        server.initialize_websockets_manager()
        address = 'ws://%s:%s' % (LOCAL_ADDR, server.server_port)
        self.server = server
        self.ui_id = ui_id
        self.address = address

172
    def heartbeat(self):
173
        """Periodically update the session database timestamp"""
174
        while True:
175
            time.sleep(2)
176 177 178 179
            with database.TransactedConnection(self.session_db) as db:
                found = db.update_heartbeat(self.ui_id)
                if not found:
                    break
180 181 182 183 184 185 186 187
        self.close_manager()
        self.server.shutdown()

    def close_manager(self):
        manager = self.server.manager
        manager.close_all()
        manager.stop()
        manager.join()
188

189
    def start(self):
190 191 192 193 194
        """Start the server in a thread"""
        t = Thread(target=self.heartbeat)
        t.start()
        self.server.serve_forever()
        t.join()
195
        LOGGER.debug('WSGI server is down')
196

197

198 199 200
class WebSocketProtocol(WebSocket):
    """Helper-side WebSocket protocol for communication with GUI:

201
    -- INTERNAL HANDSAKE --
202
    GUI: {"method": "post", "ui_id": <GUI ID>}
203
    HELPER: {"ACCEPTED": 202, "action": "post ui_id"}" or
204
        "{"REJECTED": 401, "action": "post ui_id"}
205

206
    -- ERRORS WITH SIGNIFICANCE --
207 208 209 210 211 212 213
    -- SHUT DOWN --
    GUI: {"method": "post", "path": "shutdown"}

    -- PAUSE --
    GUI: {"method": "post", "path": "pause"}
    HELPER: {"OK": 200, "action": "post pause"} or error

214
    -- START --
215 216 217
    GUI: {"method": "post", "path": "start"}
    HELPER: {"OK": 200, "action": "post start"} or error

218 219 220 221
    -- FORCE START --
    GUI: {"method": "post", "path": "force"}
    HELPER: {"OK": 200, "action": "post force"} or error

222 223 224 225 226 227 228 229 230
    -- GET SETTINGS --
    GUI: {"method": "get", "path": "settings"}
    HELPER:
        {
            "action": "get settings",
            "token": <user token>,
            "url": <auth url>,
            "container": <container>,
            "directory": <local directory>,
231 232 233
            "exclude": <file path>,
            "language": <en|el>,
            "sync_on_start": <true|false>
234 235 236 237 238 239 240 241 242
        } or {<ERROR>: <ERROR CODE>}

    -- PUT SETTINGS --
    GUI: {
            "method": "put", "path": "settings",
            "token": <user token>,
            "url": <auth url>,
            "container": <container>,
            "directory": <local directory>,
243 244 245
            "exclude": <file path>,
            "language": <en|el>,
            "sync_on_start": <true|false>
246 247 248 249 250 251
        }
    HELPER: {"CREATED": 201, "action": "put settings",} or
        {<ERROR>: <ERROR CODE>, "action": "get settings",}

    -- GET STATUS --
    GUI: {"method": "get", "path": "status"}
252
    HELPER: {"code": <int>,
253
            "synced": <int>, "unsynced": <int>, "failed": <int>,
254 255
            "action": "get status"
        } or {<ERROR>: <ERROR CODE>, "action": "get status"}
256
    """
257 258
    status = utils.ThreadSafeDict()
    with status.lock() as d:
259
        d.update(code=STATUS['UNINITIALIZED'], synced=0, unsynced=0, failed=0)
260

261
    ui_id = None
262
    session_db = None
263 264 265 266
    accepted = False
    settings = dict(
        token=None, url=None,
        container=None, directory=None,
267
        exclude=None, sync_on_start=True, language="en")
268
    cnf = AgkyraConfig()
269
    essentials = ('url', 'token', 'container', 'directory')
270

271 272 273 274 275
    def get_status(self, key=None):
        """:return: updated status dict or value of specified key"""
        if self.syncer and self.can_sync():
            self._consume_messages()
            with self.status.lock() as d:
276
                LOGGER.debug('Status was %s' % d['code'])
277 278 279 280 281 282 283
                if d['code'] in (
                        STATUS['UNINITIALIZED'], STATUS['INITIALIZING']):
                    if self.syncer.paused:
                        d['code'] = STATUS['PAUSED']
                    elif d['code'] != STATUS['PAUSING'] or (
                            d['unsynced'] == d['synced'] + d['failed']):
                        d['code'] = STATUS['SYNCING']
284
        with self.status.lock() as d:
285
            LOGGER.debug('Status is now %s' % d['code'])
286 287 288 289
            return d.get(key, None) if key else dict(d)

    def set_status(self, **kwargs):
        with self.status.lock() as d:
290
            LOGGER.debug('CHANGING STATUS TO %s' % kwargs)
291 292
            d.update(kwargs)

293 294
    @property
    def syncer(self):
295
        """:returns: the first syncer object or None"""
296 297 298 299 300
        with SYNCERS.lock() as d:
            for sync_key, sync_obj in d.items():
                return sync_obj
        return None

301
    def clean_db(self):
302
        """Clean DB from current session trace"""
303
        LOGGER.debug('Remove current session trace')
304 305
        with database.TransactedConnection(self.session_db) as db:
            db.unregister_heartbeat(self.ui_id)
306

307
    def shutdown_syncer(self, syncer_key=0):
308
        """Shutdown the syncer backend object"""
309
        LOGGER.debug('Shutdown syncer')
310 311 312 313
        with SYNCERS.lock() as d:
            syncer = d.pop(syncer_key, None)
            if syncer and self.can_sync():
                syncer.stop_all_daemons()
314
                LOGGER.debug('Wait open syncs to complete')
315 316
                syncer.wait_sync_threads()

317 318 319 320
    def _get_default_sync(self):
        """Get global.default_sync or pick the first sync as default
        If there are no syncs, create a 'default' sync.
        """
321
        sync = self.cnf.get('global', 'default_sync')
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342
        if not sync:
            for sync in self.cnf.keys('sync'):
                break
            self.cnf.set('global', 'default_sync', sync or 'default')
        return sync or 'default'

    def _get_sync_cloud(self, sync):
        """Get the <sync>.cloud or pick the first cloud and use it
        In case of cloud picking, set the cloud as the <sync>.cloud for future
        sessions.
        If no clouds are found, create a 'default' cloud, with an empty url.
        """
        try:
            cloud = self.cnf.get_sync(sync, 'cloud')
        except KeyError:
            cloud = None
        if not cloud:
            for cloud in self.cnf.keys('cloud'):
                break
            self.cnf.set_sync(sync, 'cloud', cloud or 'default')
        return cloud or 'default'
343

344
    def _load_settings(self):
345
        LOGGER.debug('Start loading settings')
346 347
        sync = self._get_default_sync()
        cloud = self._get_sync_cloud(sync)
348 349

        try:
350 351 352
            self.settings['url'] = self.cnf.get_cloud(cloud, 'url')
        except Exception:
            self.settings['url'] = None
353
            self.set_status(code=STATUS['SETTINGS MISSING'])
354 355 356 357
        try:
            self.settings['token'] = self.cnf.get_cloud(cloud, 'token')
        except Exception:
            self.settings['url'] = None
358
            self.set_status(code=STATUS['SETTINGS MISSING'])
359

360 361 362 363
        self.settings['sync_on_start'] = (
            self.cnf.get('global', 'sync_on_start') == 'on')
        self.settings['language'] = self.cnf.get('global', 'language')

364 365
        # for option in ('container', 'directory', 'exclude'):
        for option in ('container', 'directory'):
366 367 368
            try:
                self.settings[option] = self.cnf.get_sync(sync, option)
            except KeyError:
369
                LOGGER.debug('No %s is set' % option)
370
                self.set_status(code=STATUS['SETTINGS MISSING'])
371

372
        LOGGER.debug('Finished loading settings')
373

374
    def _dump_settings(self):
375
        LOGGER.debug('Saving settings')
376
        sync = self._get_default_sync()
377
        changes = False
378

379
        if not self.settings.get('url', None):
380
            LOGGER.debug('No cloud settings to save')
381
        else:
382
            LOGGER.debug('Save cloud settings')
383
            cloud = self._get_sync_cloud(sync)
384 385

            try:
386
                old_url = self.cnf.get_cloud(cloud, 'url') or ''
387
            except KeyError:
388 389 390 391 392 393 394 395 396
                old_url = self.settings['url']

            while old_url and old_url != self.settings['url']:
                cloud = '%s_%s' % (cloud, sync)
                try:
                    self.cnf.get_cloud(cloud, 'url')
                except KeyError:
                    break

397
            LOGGER.debug('Cloud name is %s' % cloud)
398 399 400 401 402
            self.cnf.set_cloud(cloud, 'url', self.settings['url'])
            self.cnf.set_cloud(cloud, 'token', self.settings['token'] or '')
            self.cnf.set_sync(sync, 'cloud', cloud)
            changes = True

403
        LOGGER.debug('Save sync settings, name is %s' % sync)
404 405
        # for option in ('directory', 'container', 'exclude'):
        for option in ('directory', 'container'):
406
            self.cnf.set_sync(sync, option, self.settings[option] or '')
407
            changes = True
408

409 410 411 412 413
        self.cnf.set('global', 'language', self.settings.get('language', 'en'))
        sync_on_start = self.settings.get('sync_on_start', False)
        self.cnf.set(
            'global', 'sync_on_start', 'on' if sync_on_start else 'off')

414 415
        if changes:
            self.cnf.write()
416
            LOGGER.debug('Settings saved')
417
        else:
418
            LOGGER.debug('No setting changes spotted')
419

420 421 422 423 424
    def _essentials_changed(self, new_settings):
        """Check if essential settings have changed in new_settings"""
        return all([
            self.settings[e] == self.settings[e] for e in self.essentials])

425
    def _consume_messages(self, max_consumption=10):
426
        """Update status by consuming and understanding syncer messages"""
427 428
        if self.can_sync():
            msg = self.syncer.get_next_message()
429 430 431 432
            # if not msg:
            #     with self.status.lock() as d:
            #         if d['unsynced'] == d['synced'] + d['failed']:
            #             d.update(unsynced=0, synced=0, failed=0)
433
            while msg:
434
                if isinstance(msg, messaging.SyncMessage):
435 436
                    LOGGER.debug(
                        'UNSYNCED +1 %s' % getattr(msg, 'objname', ''))
437
                    self.set_status(unsynced=self.get_status('unsynced') + 1)
438
                elif isinstance(msg, messaging.AckSyncMessage):
439
                    LOGGER.debug('SYNCED +1 %s' % getattr(msg, 'objname', ''))
440
                    self.set_status(synced=self.get_status('synced') + 1)
441
                elif isinstance(msg, messaging.SyncErrorMessage):
442
                    LOGGER.debug('FAILED +1 %s' % getattr(msg, 'objname', ''))
443
                    self.set_status(failed=self.get_status('failed') + 1)
444
                elif isinstance(msg, messaging.LocalfsSyncDisabled):
445 446 447 448
                    LOGGER.debug(
                        'STOP BACKEND, %s'% getattr(msg, 'objname', ''))
                    LOGGER.debug(
                        'CHANGE STATUS TO: %s' % STATUS['DIRECTORY ERROR'])
449
                    self.set_status(code=STATUS['DIRECTORY ERROR'])
450 451
                    self.syncer.stop_all_daemons()
                elif isinstance(msg, messaging.PithosSyncDisabled):
452 453
                    LOGGER.debug(
                        'STOP BACKEND, %s'% getattr(msg, 'objname', ''))
454
                    self.set_status(code=STATUS['CONTAINER ERROR'])
455
                    self.syncer.stop_all_daemons()
456
                LOGGER.debug('Backend message: %s %s' % (msg.name, type(msg)))
457 458
                # Limit the amount of messages consumed each time
                max_consumption -= 1
459
                if max_consumption:
460
                    msg = self.syncer.get_next_message()
461 462
                else:
                    break
463 464 465 466 467

    def can_sync(self):
        """Check if settings are enough to setup a syncing proccess"""
        return all([self.settings[e] for e in self.essentials])

468 469
    def init_sync(self):
        """Initialize syncer"""
470
        self.set_status(code=STATUS['INITIALIZING'])
471
        sync = self._get_default_sync()
472 473 474 475 476 477 478 479 480 481 482 483 484 485 486

        kwargs = dict(agkyra_path=AGKYRA_DIR)
        # Get SSL settings
        cloud = self._get_sync_cloud(sync)
        try:
            ignore_ssl = self.cnf.get_cloud(cloud, 'ignore_ssl') in ('on', )
            kwargs['ignore_ssl'] = ignore_ssl
        except KeyError:
            ignore_ssl = None
        if not ignore_ssl:
            try:
                kwargs['ca_certs'] = self.cnf.get_cloud(cloud, 'ca_certs')
            except KeyError:
                pass

487
        syncer_ = None
488 489 490 491 492 493 494
        try:
            syncer_settings = setup.SyncerSettings(
                self.settings['url'], self.settings['token'],
                self.settings['container'], self.settings['directory'],
                **kwargs)
            master = pithos_client.PithosFileClient(syncer_settings)
            slave = localfs_client.LocalfsFileClient(syncer_settings)
495
            syncer_ = syncer.FileSyncer(syncer_settings, master, slave)
496
            self.syncer_settings = syncer_settings
497 498 499 500
            # Check if syncer is ready, by consuming messages
            local_ok, remote_ok = False, False
            for i in range(2):

501
                LOGGER.debug('Get message %s' % (i + 1))
502
                msg = syncer_.get_next_message(block=True)
503
                LOGGER.debug('Got message: %s' % msg)
504

505
                if isinstance(msg, messaging.LocalfsSyncDisabled):
506 507 508
                    self.set_status(code=STATUS['DIRECTORY ERROR'])
                    local_ok = False
                    break
509
                elif isinstance(msg, messaging.PithosSyncDisabled):
510
                    self.set_status(code=STATUS['CONTAINER ERROR'])
511 512 513 514 515 516
                    remote_ok = False
                    break
                elif isinstance(msg, messaging.LocalfsSyncEnabled):
                    local_ok = True
                elif isinstance(msg, messaging.PithosSyncEnabled):
                    remote_ok = True
517
                else:
518
                    LOGGER.error('Unexpected message %s' % msg)
519 520 521
                    self.set_status(code=STATUS['CRITICAL ERROR'])
                    break
            if local_ok and remote_ok:
522
                syncer_.initiate_probe()
523 524 525
                self.set_status(code=STATUS['SYNCING'])
            else:
                syncer_ = None
526 527 528 529 530 531 532 533 534 535
        except pithos_client.ClientError as ce:
            LOGGER.debug('backend init failed: %s %s' % (ce, ce.status))
            try:
                code = {
                    400: STATUS['AUTH URL ERROR'],
                    401: STATUS['TOKEN ERROR'],
                }[ce.status]
            except KeyError:
                code = STATUS['UNINITIALIZED']
            self.set_status(code=code)
536
        finally:
537
            self.set_status(synced=0, unsynced=0)
538 539
            with SYNCERS.lock() as d:
                d[0] = syncer_
540

541 542 543 544 545
    # Syncer-related methods
    def get_settings(self):
        return self.settings

    def set_settings(self, new_settings):
546
        """Set the settings and dump them to permanent storage if needed"""
547
        # Prepare setting save
548
        could_sync = self.syncer and self.can_sync()
549 550 551
        was_active = False
        if could_sync and not self.syncer.paused:
            was_active = True
552 553 554 555
            self.pause_sync()
        must_reset_syncing = self._essentials_changed(new_settings)

        # save settings
556
        self.settings.update(new_settings)
557 558
        self._dump_settings()

559 560 561 562
        # Restart
        if self.can_sync():
            if must_reset_syncing or not could_sync:
                self.init_sync()
563
            if was_active:
564 565
                self.start_sync()

566 567 568
    def _pause_syncer(self):
        syncer_ = self.syncer
        syncer_.stop_decide()
569
        LOGGER.debug('Wait open syncs to complete')
570 571 572
        syncer_.wait_sync_threads()

    def pause_sync(self):
573
        """Pause syncing (assuming it is up and running)"""
574
        if self.syncer:
575
            self.set_status(code=STATUS['PAUSING'])
576 577
            self.syncer.stop_decide()
            self.set_status(code=STATUS['PAUSED'])
578 579

    def start_sync(self):
580
        """Start syncing"""
581
        self.syncer.start_decide()
582
        self.set_status(code=STATUS['SYNCING'])
583

584 585 586 587 588 589 590 591 592 593 594
    def force_sync(self):
        """Force syncing, assuming there is a directory or container problem"""
        self.set_status(code=STATUS['INITIALIZING'])
        self.syncer_settings.purge_db_archives_and_enable()
        self.init_sync()
        if self.syncer:
            self.syncer.start_decide()
            self.set_status(code=STATUS['SYNCING'])
        else:
            self.set_status(code=STATUS['CRITICAL ERROR'])

595
    def send_json(self, msg):
596
        LOGGER.debug('send: %s' % msg)
597 598 599 600 601 602 603 604
        self.send(json.dumps(msg))

    # Protocol handling methods
    def _post(self, r):
        """Handle POST requests"""
        if self.accepted:
            action = r['path']
            if action == 'shutdown':
605 606
                # Clean db to cause syncer backend to shut down
                self.set_status(code=STATUS['SHUTTING DOWN'])
607
                self.shutdown_syncer()
608
                self.clean_db()
609 610 611
                return
            {
                'start': self.start_sync,
612 613
                'pause': self.pause_sync,
                'force': self.force_sync
614 615
            }[action]()
            self.send_json({'OK': 200, 'action': 'post %s' % action})
616
        elif r['ui_id'] == self.ui_id:
617
            self.accepted = True
618
            self.send_json({'ACCEPTED': 202, 'action': 'post ui_id'})
619
            self._load_settings()
620
            if (not self.syncer) and self.can_sync():
621
                self.init_sync()
622
                if self.syncer and self.settings['sync_on_start']:
623
                    self.start_sync()
624
        else:
625
            action = r.get('path', 'ui_id')
626 627 628 629 630
            self.send_json({'REJECTED': 401, 'action': 'post %s' % action})
            self.terminate()

    def _put(self, r):
        """Handle PUT requests"""
631
        if self.accepted:
632
            LOGGER.debug('put %s' % r)
633 634 635 636
            action = r.pop('path')
            self.set_settings(r)
            r.update({'CREATED': 201, 'action': 'put %s' % action})
            self.send_json(r)
637 638
        else:
            action = r['path']
639 640
            self.send_json({
                'UNAUTHORIZED UI': 401, 'action': 'put %s' % action})
641
            self.terminate()
642 643 644 645 646

    def _get(self, r):
        """Handle GET requests"""
        action = r.pop('path')
        if not self.accepted:
647 648
            self.send_json({
                'UNAUTHORIZED UI': 401, 'action': 'get %s' % action})
649 650 651 652 653 654 655 656 657 658 659 660 661 662 663
            self.terminate()
        else:
            data = {
                'settings': self.get_settings,
                'status': self.get_status,
            }[action]()
            data['action'] = 'get %s' % action
            self.send_json(data)

    def received_message(self, message):
        """Route requests to corresponding handling methods"""
        try:
            r = json.loads('%s' % message)
        except ValueError as ve:
            self.send_json({'BAD REQUEST': 400})
664
            LOGGER.error('JSON ERROR: %s' % ve)
665 666 667 668 669 670 671 672 673
            return
        try:
            method = r.pop('method')
            {
                'post': self._post,
                'put': self._put,
                'get': self._get
            }[method](r)
        except KeyError as ke:
674 675
            action = method + ' ' + r.get('path', '')
            self.send_json({'BAD REQUEST': 400, 'action': action})
676
            LOGGER.error('KEY ERROR: %s' % ke)
677 678 679 680 681
        except setup.ClientError as ce:
            action = '%s %s' % (
                method, r.get('path', 'ui_id' if 'ui_id' in r else ''))
            self.send_json({'%s' % ce: ce.status, 'action': action})
            return
682 683
        except Exception as e:
            self.send_json({'INTERNAL ERROR': 500})
684
            reason = '%s %s' % (method or '', r)
685
            LOGGER.error('EXCEPTION (%s): %s' % (reason, e))
686
            self.terminate()
687 688


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
689
def launch_server(callback, debug):
690
    """Launch the server in a separate process"""
691
    LOGGER.info('Start SessionHelper session')
692 693 694
    opts = ["start", "daemon"]
    if debug:
        opts.append('-d')
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
695
    if utils.iswin():
696 697
        command = [] if ISFROZEN else ["pythonw.exe"]
        command.append(callback)
698
        command += opts
699
        subprocess.Popen(command, close_fds=True)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
700
    else:
701
        import daemon
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
702 703
        pid = os.fork()
        if not pid:
704 705 706 707 708 709
            with daemon.DaemonContext():
                command = [callback, callback]
                command += opts
                os.execlp(*command)
        else:
            os.wait()