protocol.py 26.1 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16
from wsgiref.simple_server import make_server
17
from ws4py.websocket import WebSocket
18 19 20 21
from ws4py.server.wsgiutils import WebSocketWSGIApplication
from ws4py.server.wsgirefserver import WSGIServer, WebSocketWSGIRequestHandler
from hashlib import sha1
from threading import Thread
22
import time
23
import os
24
import sys
25 26
import json
import logging
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
27
import subprocess
28
from agkyra.syncer import (
29 30
    syncer, setup, pithos_client, localfs_client, messaging, utils, database,
    common)
31
from agkyra.config import AgkyraConfig, AGKYRA_DIR
32

33 34 35 36 37 38 39 40
if getattr(sys, 'frozen', False):
    # we are running in a |PyInstaller| bundle
    BASEDIR = sys._MEIPASS
    ISFROZEN = True
else:
    # we are running in a normal Python environment
    BASEDIR = os.path.dirname(os.path.realpath(__file__))
    ISFROZEN = False
41

42
RESOURCES = os.path.join(BASEDIR, 'resources')
43

44
LOGGER = logging.getLogger(__name__)
45
SYNCERS = utils.ThreadSafeDict()
46

47
with open(os.path.join(RESOURCES, 'ui_data/common_en.json')) as f:
48 49
    COMMON = json.load(f)
STATUS = COMMON['STATUS']
50

51

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
class SessionDB(database.DB):
    def init(self):
        db = self.db
        db.execute(
            'CREATE TABLE IF NOT EXISTS heart ('
            'ui_id VARCHAR(256), address text, beat VARCHAR(32)'
            ')')

    def get_all_heartbeats(self):
        db = self.db
        r = db.execute('SELECT * FROM heart')
        return r.fetchall()

    def register_heartbeat(self, ui_id, address):
        db = self.db
        db.execute('INSERT INTO heart VALUES (?, ?, ?)',
                   (ui_id, address, time.time()))

    def update_heartbeat(self, ui_id):
        db = self.db
        r = db.execute('SELECT ui_id FROM heart WHERE ui_id=?', (ui_id,))
        if r.fetchall():
            db.execute('UPDATE heart SET beat=? WHERE ui_id=?',
                       (time.time(), ui_id))
            return True
        return False

    def unregister_heartbeat(self, ui_id):
        db = self.db
        db.execute('DELETE FROM heart WHERE ui_id=?', (ui_id,))
82 83


84
class SessionHelper(object):
85 86
    """Enables creation of a session daemon and retrieves credentials of an
    existing one
87
    """
88
    session_timeout = 20
89

90
    def __init__(self, **kwargs):
91
        """Setup the helper server"""
92
        db_name = kwargs.get(
93
            'session_db', os.path.join(AGKYRA_DIR, 'session.db'))
94 95
        self.session_db = common.DBTuple(dbtype=SessionDB, dbname=db_name)
        database.initialize(self.session_db)
96

97
    def load_active_session(self):
98 99 100 101
        with database.TransactedConnection(self.session_db) as db:
            return self._load_active_session(db)

    def _load_active_session(self, db):
102
        """Load a session from db"""
103
        sessions = db.get_all_heartbeats()
104
        if sessions:
105 106
            last, expected_id = sessions[-1], getattr(self, 'ui_id', None)
            if expected_id and last[0] != '%s' % expected_id:
107
                LOGGER.debug('Session ID is old')
108
                return None
109 110 111 112
            now, last_beat = time.time(), float(last[2])
            if now - last_beat < self.session_timeout:
                # Found an active session
                return dict(ui_id=last[0], address=last[1])
113
        LOGGER.debug('No active sessions found')
114 115
        return None

116 117
    def create_session_daemon(self):
        """Create and return a new daemon, or None if one exists"""
118 119 120 121 122 123 124
        with database.TransactedConnection(self.session_db) as db:
            session = self._load_active_session(db)
            if session:
                return None
            session_daemon = SessionDaemon(self.session_db)
            db.register_heartbeat(session_daemon.ui_id, session_daemon.address)
            return session_daemon
125

126
    def wait_session_to_load(self, timeout=20, step=0.2):
127 128 129 130 131 132 133 134 135 136 137 138
        """Wait while the session is loading e.g. in another process
            :returns: the session or None if timeout
        """
        time_passed = 0
        while time_passed < timeout:
            self.session = self.load_active_session()
            if self.session:
                return self.session
            time_passed += step
            time.sleep(step)
        return None

139 140 141 142 143 144 145 146 147 148
    def wait_session_to_stop(self, timeout=20, step=2):
        """Wait while the session is shutting down
            :returns: True if stopped, False if timed out and still running
        """
        time_passed = 0
        while time_passed < timeout and self.load_active_session():
            time.sleep(step)
            time_passed += step
        return not bool(self.load_active_session())

149 150 151 152 153

class SessionDaemon(object):
    """A WebSocket server which inspects a heartbeat and decides whether to
    shut down
    """
154
    def __init__(self, session_db, *args, **kwargs):
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
        self.session_db = session_db
        ui_id = sha1(os.urandom(128)).hexdigest()

        LOCAL_ADDR = '127.0.0.1'
        WebSocketProtocol.ui_id = ui_id
        WebSocketProtocol.session_db = session_db
        server = make_server(
            LOCAL_ADDR, 0,
            server_class=WSGIServer,
            handler_class=WebSocketWSGIRequestHandler,
            app=WebSocketWSGIApplication(handler_cls=WebSocketProtocol))
        server.initialize_websockets_manager()
        address = 'ws://%s:%s' % (LOCAL_ADDR, server.server_port)
        self.server = server
        self.ui_id = ui_id
        self.address = address

172
    def heartbeat(self):
173
        """Periodically update the session database timestamp"""
174
        while True:
175
            time.sleep(2)
176 177 178 179
            with database.TransactedConnection(self.session_db) as db:
                found = db.update_heartbeat(self.ui_id)
                if not found:
                    break
180 181 182 183 184 185 186 187
        self.close_manager()
        self.server.shutdown()

    def close_manager(self):
        manager = self.server.manager
        manager.close_all()
        manager.stop()
        manager.join()
188

189
    def start(self):
190 191 192 193 194
        """Start the server in a thread"""
        t = Thread(target=self.heartbeat)
        t.start()
        self.server.serve_forever()
        t.join()
195
        LOGGER.debug('WSGI server is down')
196

197

198 199 200
class WebSocketProtocol(WebSocket):
    """Helper-side WebSocket protocol for communication with GUI:

201
    -- INTERNAL HANDSAKE --
202
    GUI: {"method": "post", "ui_id": <GUI ID>}
203
    HELPER: {"ACCEPTED": 202, "action": "post ui_id"}" or
204
        "{"REJECTED": 401, "action": "post ui_id"}
205

206
    -- ERRORS WITH SIGNIFICANCE --
207 208 209 210 211 212 213
    -- SHUT DOWN --
    GUI: {"method": "post", "path": "shutdown"}

    -- PAUSE --
    GUI: {"method": "post", "path": "pause"}
    HELPER: {"OK": 200, "action": "post pause"} or error

214
    -- START --
215 216 217
    GUI: {"method": "post", "path": "start"}
    HELPER: {"OK": 200, "action": "post start"} or error

218 219 220 221
    -- FORCE START --
    GUI: {"method": "post", "path": "force"}
    HELPER: {"OK": 200, "action": "post force"} or error

222 223 224 225 226 227 228 229 230
    -- GET SETTINGS --
    GUI: {"method": "get", "path": "settings"}
    HELPER:
        {
            "action": "get settings",
            "token": <user token>,
            "url": <auth url>,
            "container": <container>,
            "directory": <local directory>,
231 232 233
            "exclude": <file path>,
            "language": <en|el>,
            "sync_on_start": <true|false>
234 235 236 237 238 239 240 241 242
        } or {<ERROR>: <ERROR CODE>}

    -- PUT SETTINGS --
    GUI: {
            "method": "put", "path": "settings",
            "token": <user token>,
            "url": <auth url>,
            "container": <container>,
            "directory": <local directory>,
243 244 245
            "exclude": <file path>,
            "language": <en|el>,
            "sync_on_start": <true|false>
246 247 248 249 250 251
        }
    HELPER: {"CREATED": 201, "action": "put settings",} or
        {<ERROR>: <ERROR CODE>, "action": "get settings",}

    -- GET STATUS --
    GUI: {"method": "get", "path": "status"}
252
    HELPER: {"code": <int>,
253
            "synced": <int>, "unsynced": <int>, "failed": <int>,
254 255
            "action": "get status"
        } or {<ERROR>: <ERROR CODE>, "action": "get status"}
256
    """
257 258
    status = utils.ThreadSafeDict()
    with status.lock() as d:
259
        d.update(code=STATUS['UNINITIALIZED'], synced=0, unsynced=0, failed=0)
260

261
    ui_id = None
262
    session_db = None
263 264 265 266
    accepted = False
    settings = dict(
        token=None, url=None,
        container=None, directory=None,
267
        exclude=None, sync_on_start=True, language="en")
268
    cnf = AgkyraConfig()
269
    essentials = ('url', 'token', 'container', 'directory')
270

271 272 273 274 275
    def get_status(self, key=None):
        """:return: updated status dict or value of specified key"""
        if self.syncer and self.can_sync():
            self._consume_messages()
            with self.status.lock() as d:
276
                LOGGER.debug('Status was %s' % d['code'])
277 278 279 280 281 282 283
                if d['code'] in (
                        STATUS['UNINITIALIZED'], STATUS['INITIALIZING']):
                    if self.syncer.paused:
                        d['code'] = STATUS['PAUSED']
                    elif d['code'] != STATUS['PAUSING'] or (
                            d['unsynced'] == d['synced'] + d['failed']):
                        d['code'] = STATUS['SYNCING']
284
        with self.status.lock() as d:
285
            LOGGER.debug('Status is now %s' % d['code'])
286 287 288 289
            return d.get(key, None) if key else dict(d)

    def set_status(self, **kwargs):
        with self.status.lock() as d:
290
            LOGGER.debug('Set status to %s' % kwargs)
291 292
            d.update(kwargs)

293 294
    @property
    def syncer(self):
295
        """:returns: the first syncer object or None"""
296 297 298 299 300
        with SYNCERS.lock() as d:
            for sync_key, sync_obj in d.items():
                return sync_obj
        return None

301
    def clean_db(self):
302
        """Clean DB from current session trace"""
303
        LOGGER.debug('Remove current session trace')
304 305
        with database.TransactedConnection(self.session_db) as db:
            db.unregister_heartbeat(self.ui_id)
306

307
    def shutdown_syncer(self, syncer_key=0):
308
        """Shutdown the syncer backend object"""
309
        LOGGER.debug('Shutdown syncer')
310 311 312 313
        with SYNCERS.lock() as d:
            syncer = d.pop(syncer_key, None)
            if syncer and self.can_sync():
                syncer.stop_all_daemons()
314
                LOGGER.debug('Wait open syncs to complete')
315 316
                syncer.wait_sync_threads()

317 318 319 320
    def _get_default_sync(self):
        """Get global.default_sync or pick the first sync as default
        If there are no syncs, create a 'default' sync.
        """
321
        sync = self.cnf.get('global', 'default_sync')
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342
        if not sync:
            for sync in self.cnf.keys('sync'):
                break
            self.cnf.set('global', 'default_sync', sync or 'default')
        return sync or 'default'

    def _get_sync_cloud(self, sync):
        """Get the <sync>.cloud or pick the first cloud and use it
        In case of cloud picking, set the cloud as the <sync>.cloud for future
        sessions.
        If no clouds are found, create a 'default' cloud, with an empty url.
        """
        try:
            cloud = self.cnf.get_sync(sync, 'cloud')
        except KeyError:
            cloud = None
        if not cloud:
            for cloud in self.cnf.keys('cloud'):
                break
            self.cnf.set_sync(sync, 'cloud', cloud or 'default')
        return cloud or 'default'
343

344
    def _load_settings(self):
345
        LOGGER.debug('Start loading settings')
346 347
        sync = self._get_default_sync()
        cloud = self._get_sync_cloud(sync)
348

349 350 351 352 353 354 355 356 357
        for option in ('url', 'token'):
            try:
                value = self.cnf.get_cloud(cloud, option)
                if not value:
                    raise Exception()
                self.settings[option] = value
            except Exception:
                self.settings[option] = None
                self.set_status(code=STATUS['SETTINGS MISSING'])
358

359 360 361 362
        self.settings['sync_on_start'] = (
            self.cnf.get('global', 'sync_on_start') == 'on')
        self.settings['language'] = self.cnf.get('global', 'language')

363 364
        # for option in ('container', 'directory', 'exclude'):
        for option in ('container', 'directory'):
365
            try:
366 367 368 369
                 value = self.cnf.get_sync(sync, option)
                 if not value:
                    raise KeyError()
                 self.settings[option] = value
370
            except KeyError:
371
                LOGGER.debug('No %s is set' % option)
372
                self.set_status(code=STATUS['SETTINGS MISSING'])
373

374
        LOGGER.debug('Finished loading settings')
375

376
    def _dump_settings(self):
377
        LOGGER.debug('Saving settings')
378
        sync = self._get_default_sync()
379
        changes = False
380

381
        if not self.settings.get('url', None):
382
            LOGGER.debug('No cloud settings to save')
383
        else:
384
            LOGGER.debug('Save cloud settings')
385
            cloud = self._get_sync_cloud(sync)
386 387

            try:
388
                old_url = self.cnf.get_cloud(cloud, 'url') or ''
389
            except KeyError:
390 391 392 393 394 395 396 397 398
                old_url = self.settings['url']

            while old_url and old_url != self.settings['url']:
                cloud = '%s_%s' % (cloud, sync)
                try:
                    self.cnf.get_cloud(cloud, 'url')
                except KeyError:
                    break

399
            LOGGER.debug('Cloud name is %s' % cloud)
400 401 402 403 404
            self.cnf.set_cloud(cloud, 'url', self.settings['url'])
            self.cnf.set_cloud(cloud, 'token', self.settings['token'] or '')
            self.cnf.set_sync(sync, 'cloud', cloud)
            changes = True

405
        LOGGER.debug('Save sync settings, name is %s' % sync)
406 407
        # for option in ('directory', 'container', 'exclude'):
        for option in ('directory', 'container'):
408
            self.cnf.set_sync(sync, option, self.settings[option] or '')
409
            changes = True
410

411 412 413 414 415
        self.cnf.set('global', 'language', self.settings.get('language', 'en'))
        sync_on_start = self.settings.get('sync_on_start', False)
        self.cnf.set(
            'global', 'sync_on_start', 'on' if sync_on_start else 'off')

416 417
        if changes:
            self.cnf.write()
418
            LOGGER.debug('Settings saved')
419
        else:
420
            LOGGER.debug('No setting changes spotted')
421

422 423 424 425 426
    def _essentials_changed(self, new_settings):
        """Check if essential settings have changed in new_settings"""
        return all([
            self.settings[e] == self.settings[e] for e in self.essentials])

427
    def _consume_messages(self, max_consumption=10):
428
        """Update status by consuming and understanding syncer messages"""
429 430
        if self.can_sync():
            msg = self.syncer.get_next_message()
431 432 433 434
            # if not msg:
            #     with self.status.lock() as d:
            #         if d['unsynced'] == d['synced'] + d['failed']:
            #             d.update(unsynced=0, synced=0, failed=0)
435
            while msg:
436
                if isinstance(msg, messaging.SyncMessage):
437 438
                    LOGGER.debug(
                        'UNSYNCED +1 %s' % getattr(msg, 'objname', ''))
439
                    self.set_status(unsynced=self.get_status('unsynced') + 1)
440
                elif isinstance(msg, messaging.AckSyncMessage):
441
                    LOGGER.debug('SYNCED +1 %s' % getattr(msg, 'objname', ''))
442
                    self.set_status(synced=self.get_status('synced') + 1)
443
                elif isinstance(msg, messaging.SyncErrorMessage):
444
                    LOGGER.debug('FAILED +1 %s' % getattr(msg, 'objname', ''))
445
                    self.set_status(failed=self.get_status('failed') + 1)
446
                elif isinstance(msg, messaging.LocalfsSyncDisabled):
447 448 449 450
                    LOGGER.debug(
                        'STOP BACKEND, %s'% getattr(msg, 'objname', ''))
                    LOGGER.debug(
                        'CHANGE STATUS TO: %s' % STATUS['DIRECTORY ERROR'])
451
                    self.set_status(code=STATUS['DIRECTORY ERROR'])
452 453
                    self.syncer.stop_all_daemons()
                elif isinstance(msg, messaging.PithosSyncDisabled):
454 455
                    LOGGER.debug(
                        'STOP BACKEND, %s'% getattr(msg, 'objname', ''))
456
                    self.set_status(code=STATUS['CONTAINER ERROR'])
457
                    self.syncer.stop_all_daemons()
458
                LOGGER.debug('Backend message: %s %s' % (msg.name, type(msg)))
459 460
                # Limit the amount of messages consumed each time
                max_consumption -= 1
461
                if max_consumption:
462
                    msg = self.syncer.get_next_message()
463 464
                else:
                    break
465 466 467 468 469

    def can_sync(self):
        """Check if settings are enough to setup a syncing proccess"""
        return all([self.settings[e] for e in self.essentials])

470 471
    def init_sync(self):
        """Initialize syncer"""
472
        self.set_status(code=STATUS['INITIALIZING'])
473
        sync = self._get_default_sync()
474 475 476 477 478 479 480 481 482 483 484 485 486 487 488

        kwargs = dict(agkyra_path=AGKYRA_DIR)
        # Get SSL settings
        cloud = self._get_sync_cloud(sync)
        try:
            ignore_ssl = self.cnf.get_cloud(cloud, 'ignore_ssl') in ('on', )
            kwargs['ignore_ssl'] = ignore_ssl
        except KeyError:
            ignore_ssl = None
        if not ignore_ssl:
            try:
                kwargs['ca_certs'] = self.cnf.get_cloud(cloud, 'ca_certs')
            except KeyError:
                pass

489
        syncer_ = None
490 491 492 493 494 495 496
        try:
            syncer_settings = setup.SyncerSettings(
                self.settings['url'], self.settings['token'],
                self.settings['container'], self.settings['directory'],
                **kwargs)
            master = pithos_client.PithosFileClient(syncer_settings)
            slave = localfs_client.LocalfsFileClient(syncer_settings)
497
            syncer_ = syncer.FileSyncer(syncer_settings, master, slave)
498
            self.syncer_settings = syncer_settings
499 500 501 502
            # Check if syncer is ready, by consuming messages
            local_ok, remote_ok = False, False
            for i in range(2):

503
                LOGGER.debug('Get message %s' % (i + 1))
504
                msg = syncer_.get_next_message(block=True)
505
                LOGGER.debug('Got message: %s' % msg)
506

507
                if isinstance(msg, messaging.LocalfsSyncDisabled):
508 509 510
                    self.set_status(code=STATUS['DIRECTORY ERROR'])
                    local_ok = False
                    break
511
                elif isinstance(msg, messaging.PithosSyncDisabled):
512
                    self.set_status(code=STATUS['CONTAINER ERROR'])
513 514 515 516 517 518
                    remote_ok = False
                    break
                elif isinstance(msg, messaging.LocalfsSyncEnabled):
                    local_ok = True
                elif isinstance(msg, messaging.PithosSyncEnabled):
                    remote_ok = True
519
                else:
520
                    LOGGER.error('Unexpected message %s' % msg)
521 522 523
                    self.set_status(code=STATUS['CRITICAL ERROR'])
                    break
            if local_ok and remote_ok:
524
                syncer_.initiate_probe()
525 526 527
                self.set_status(code=STATUS['SYNCING'])
            else:
                syncer_ = None
528 529 530 531 532 533 534 535 536 537
        except pithos_client.ClientError as ce:
            LOGGER.debug('backend init failed: %s %s' % (ce, ce.status))
            try:
                code = {
                    400: STATUS['AUTH URL ERROR'],
                    401: STATUS['TOKEN ERROR'],
                }[ce.status]
            except KeyError:
                code = STATUS['UNINITIALIZED']
            self.set_status(code=code)
538
        finally:
539
            self.set_status(synced=0, unsynced=0)
540 541
            with SYNCERS.lock() as d:
                d[0] = syncer_
542

543 544 545 546 547
    # Syncer-related methods
    def get_settings(self):
        return self.settings

    def set_settings(self, new_settings):
548
        """Set the settings and dump them to permanent storage if needed"""
549
        # Prepare setting save
550
        could_sync = self.syncer and self.can_sync()
551 552 553
        old_status = self.get_status('code')
        active = (STATUS['SYNCING'], STATUS['PAUSING'], STATUS['PAUSED'])

554
        must_reset_syncing = self._essentials_changed(new_settings)
555 556 557
        if must_reset_syncing and old_status in active:
            LOGGER.debug('Temporary backend shutdown to save settings')
            self.shutdown_syncer()
558 559

        # save settings
560
        self.settings.update(new_settings)
561 562
        self._dump_settings()

563
        # Restart
564 565 566 567 568 569 570 571 572 573 574 575
        LOGGER.debug('Reload settings')
        self._load_settings()
        can_sync = must_reset_syncing and self.can_sync()
        if can_sync:
            LOGGER.debug('Restart backend')
            self.init_sync()
            new_status = self.get_status('code')
            if new_status in active:
                must_sync = old_status == STATUS['SYNCING'] or (
                    old_status not in active and (
                        self.settings.get('sync_on_start', False)))
                (self.start_sync if must_sync else self.pause_sync)()
576

577 578 579
    def _pause_syncer(self):
        syncer_ = self.syncer
        syncer_.stop_decide()
580
        LOGGER.debug('Wait open syncs to complete')
581 582 583
        syncer_.wait_sync_threads()

    def pause_sync(self):
584
        """Pause syncing (assuming it is up and running)"""
585
        if self.syncer:
586
            self.set_status(code=STATUS['PAUSING'])
587 588
            self.syncer.stop_decide()
            self.set_status(code=STATUS['PAUSED'])
589 590

    def start_sync(self):
591
        """Start syncing"""
592
        self.syncer.start_decide()
593
        self.set_status(code=STATUS['SYNCING'])
594

595 596 597 598 599 600 601 602 603 604 605
    def force_sync(self):
        """Force syncing, assuming there is a directory or container problem"""
        self.set_status(code=STATUS['INITIALIZING'])
        self.syncer_settings.purge_db_archives_and_enable()
        self.init_sync()
        if self.syncer:
            self.syncer.start_decide()
            self.set_status(code=STATUS['SYNCING'])
        else:
            self.set_status(code=STATUS['CRITICAL ERROR'])

606
    def send_json(self, msg):
607
        LOGGER.debug('send: %s' % msg)
608 609 610 611 612 613 614 615
        self.send(json.dumps(msg))

    # Protocol handling methods
    def _post(self, r):
        """Handle POST requests"""
        if self.accepted:
            action = r['path']
            if action == 'shutdown':
616 617
                # Clean db to cause syncer backend to shut down
                self.set_status(code=STATUS['SHUTTING DOWN'])
618
                self.shutdown_syncer()
619
                self.clean_db()
620 621 622
                return
            {
                'start': self.start_sync,
623 624
                'pause': self.pause_sync,
                'force': self.force_sync
625 626
            }[action]()
            self.send_json({'OK': 200, 'action': 'post %s' % action})
627
        elif r['ui_id'] == self.ui_id:
628
            self.accepted = True
629
            self.send_json({'ACCEPTED': 202, 'action': 'post ui_id'})
630
            self._load_settings()
631
            if (not self.syncer) and self.can_sync():
632
                self.init_sync()
633
                if self.syncer and self.settings['sync_on_start']:
634
                    self.start_sync()
635
        else:
636
            action = r.get('path', 'ui_id')
637 638 639 640 641
            self.send_json({'REJECTED': 401, 'action': 'post %s' % action})
            self.terminate()

    def _put(self, r):
        """Handle PUT requests"""
642
        if self.accepted:
643
            LOGGER.debug('put %s' % r)
644 645 646 647
            action = r.pop('path')
            self.set_settings(r)
            r.update({'CREATED': 201, 'action': 'put %s' % action})
            self.send_json(r)
648 649
        else:
            action = r['path']
650 651
            self.send_json({
                'UNAUTHORIZED UI': 401, 'action': 'put %s' % action})
652
            self.terminate()
653 654 655 656 657

    def _get(self, r):
        """Handle GET requests"""
        action = r.pop('path')
        if not self.accepted:
658 659
            self.send_json({
                'UNAUTHORIZED UI': 401, 'action': 'get %s' % action})
660 661 662 663 664 665 666 667 668 669 670 671 672 673 674
            self.terminate()
        else:
            data = {
                'settings': self.get_settings,
                'status': self.get_status,
            }[action]()
            data['action'] = 'get %s' % action
            self.send_json(data)

    def received_message(self, message):
        """Route requests to corresponding handling methods"""
        try:
            r = json.loads('%s' % message)
        except ValueError as ve:
            self.send_json({'BAD REQUEST': 400})
675
            LOGGER.error('JSON ERROR: %s' % ve)
676 677 678 679 680 681 682 683 684
            return
        try:
            method = r.pop('method')
            {
                'post': self._post,
                'put': self._put,
                'get': self._get
            }[method](r)
        except KeyError as ke:
685 686
            action = method + ' ' + r.get('path', '')
            self.send_json({'BAD REQUEST': 400, 'action': action})
687
            LOGGER.error('KEY ERROR: %s' % ke)
688 689 690 691 692
        except setup.ClientError as ce:
            action = '%s %s' % (
                method, r.get('path', 'ui_id' if 'ui_id' in r else ''))
            self.send_json({'%s' % ce: ce.status, 'action': action})
            return
693 694
        except Exception as e:
            self.send_json({'INTERNAL ERROR': 500})
695
            reason = '%s %s' % (method or '', r)
696
            LOGGER.error('EXCEPTION (%s): %s' % (reason, e))
697
            self.terminate()
698 699


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
700
def launch_server(callback, debug):
701
    """Launch the server in a separate process"""
702
    LOGGER.info('Start SessionHelper session')
703 704 705
    opts = ["start", "daemon"]
    if debug:
        opts.append('-d')
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
706
    if utils.iswin():
707 708
        command = [] if ISFROZEN else ["pythonw.exe"]
        command.append(callback)
709
        command += opts
710
        subprocess.Popen(command, close_fds=True)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
711
    else:
712
        import daemon
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
713 714
        pid = os.fork()
        if not pid:
715 716 717 718 719 720
            with daemon.DaemonContext():
                command = [callback, callback]
                command += opts
                os.execlp(*command)
        else:
            os.wait()