protocol.py 28.9 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16
from wsgiref.simple_server import make_server
17
from ws4py.websocket import WebSocket
18 19 20 21
from ws4py.server.wsgiutils import WebSocketWSGIApplication
from ws4py.server.wsgirefserver import WSGIServer, WebSocketWSGIRequestHandler
from hashlib import sha1
from threading import Thread
22
import time
23
import os
24
import sys
25 26
import json
import logging
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
27
import subprocess
28
from agkyra.syncer import (
29 30
    syncer, setup, pithos_client, localfs_client, messaging, utils, database,
    common)
31
from agkyra.config import AgkyraConfig, AGKYRA_DIR
32

33 34 35 36 37 38 39 40
if getattr(sys, 'frozen', False):
    # we are running in a |PyInstaller| bundle
    BASEDIR = sys._MEIPASS
    ISFROZEN = True
else:
    # we are running in a normal Python environment
    BASEDIR = os.path.dirname(os.path.realpath(__file__))
    ISFROZEN = False
41

42
RESOURCES = os.path.join(BASEDIR, 'resources')
43

44
LOGGER = logging.getLogger(__name__)
45
SYNCERS = utils.ThreadSafeDict()
46

47
with open(os.path.join(RESOURCES, 'ui_data/common_en.json')) as f:
48 49
    COMMON = json.load(f)
STATUS = COMMON['STATUS']
50

51

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
class SessionDB(database.DB):
    def init(self):
        db = self.db
        db.execute(
            'CREATE TABLE IF NOT EXISTS heart ('
            'ui_id VARCHAR(256), address text, beat VARCHAR(32)'
            ')')

    def get_all_heartbeats(self):
        db = self.db
        r = db.execute('SELECT * FROM heart')
        return r.fetchall()

    def register_heartbeat(self, ui_id, address):
        db = self.db
        db.execute('INSERT INTO heart VALUES (?, ?, ?)',
                   (ui_id, address, time.time()))

    def update_heartbeat(self, ui_id):
        db = self.db
        r = db.execute('SELECT ui_id FROM heart WHERE ui_id=?', (ui_id,))
        if r.fetchall():
            db.execute('UPDATE heart SET beat=? WHERE ui_id=?',
                       (time.time(), ui_id))
            return True
        return False

    def unregister_heartbeat(self, ui_id):
        db = self.db
        db.execute('DELETE FROM heart WHERE ui_id=?', (ui_id,))
82 83


84
class SessionHelper(object):
85 86
    """Enables creation of a session daemon and retrieves credentials of an
    existing one
87
    """
88
    session_timeout = 20
89

90
    def __init__(self, **kwargs):
91
        """Setup the helper server"""
92
        db_name = kwargs.get(
93
            'session_db', os.path.join(AGKYRA_DIR, 'session.db'))
94 95
        self.session_db = common.DBTuple(dbtype=SessionDB, dbname=db_name)
        database.initialize(self.session_db)
96

97
    def load_active_session(self):
98 99 100 101
        with database.TransactedConnection(self.session_db) as db:
            return self._load_active_session(db)

    def _load_active_session(self, db):
102
        """Load a session from db"""
103
        sessions = db.get_all_heartbeats()
104
        if sessions:
105 106
            last, expected_id = sessions[-1], getattr(self, 'ui_id', None)
            if expected_id and last[0] != '%s' % expected_id:
107
                LOGGER.debug('Session ID is old')
108
                return None
109 110 111 112
            now, last_beat = time.time(), float(last[2])
            if now - last_beat < self.session_timeout:
                # Found an active session
                return dict(ui_id=last[0], address=last[1])
113
        LOGGER.debug('No active sessions found')
114 115
        return None

116 117
    def create_session_daemon(self):
        """Create and return a new daemon, or None if one exists"""
118 119 120 121 122 123 124
        with database.TransactedConnection(self.session_db) as db:
            session = self._load_active_session(db)
            if session:
                return None
            session_daemon = SessionDaemon(self.session_db)
            db.register_heartbeat(session_daemon.ui_id, session_daemon.address)
            return session_daemon
125

126
    def wait_session_to_load(self, timeout=20, step=0.2):
127 128 129 130 131 132 133 134 135 136 137 138
        """Wait while the session is loading e.g. in another process
            :returns: the session or None if timeout
        """
        time_passed = 0
        while time_passed < timeout:
            self.session = self.load_active_session()
            if self.session:
                return self.session
            time_passed += step
            time.sleep(step)
        return None

139 140 141 142 143 144 145 146 147 148
    def wait_session_to_stop(self, timeout=20, step=2):
        """Wait while the session is shutting down
            :returns: True if stopped, False if timed out and still running
        """
        time_passed = 0
        while time_passed < timeout and self.load_active_session():
            time.sleep(step)
            time_passed += step
        return not bool(self.load_active_session())

149 150 151 152 153

class SessionDaemon(object):
    """A WebSocket server which inspects a heartbeat and decides whether to
    shut down
    """
154
    def __init__(self, session_db, *args, **kwargs):
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
        self.session_db = session_db
        ui_id = sha1(os.urandom(128)).hexdigest()

        LOCAL_ADDR = '127.0.0.1'
        WebSocketProtocol.ui_id = ui_id
        WebSocketProtocol.session_db = session_db
        server = make_server(
            LOCAL_ADDR, 0,
            server_class=WSGIServer,
            handler_class=WebSocketWSGIRequestHandler,
            app=WebSocketWSGIApplication(handler_cls=WebSocketProtocol))
        server.initialize_websockets_manager()
        address = 'ws://%s:%s' % (LOCAL_ADDR, server.server_port)
        self.server = server
        self.ui_id = ui_id
        self.address = address

172
    def heartbeat(self):
173
        """Periodically update the session database timestamp"""
174
        while True:
175
            time.sleep(2)
176 177 178 179
            with database.TransactedConnection(self.session_db) as db:
                found = db.update_heartbeat(self.ui_id)
                if not found:
                    break
180 181 182 183 184 185 186 187
        self.close_manager()
        self.server.shutdown()

    def close_manager(self):
        manager = self.server.manager
        manager.close_all()
        manager.stop()
        manager.join()
188

189
    def start(self):
190 191 192 193 194
        """Start the server in a thread"""
        t = Thread(target=self.heartbeat)
        t.start()
        self.server.serve_forever()
        t.join()
195
        LOGGER.debug('WSGI server is down')
196

197

198 199 200
class WebSocketProtocol(WebSocket):
    """Helper-side WebSocket protocol for communication with GUI:

201
    -- INTERNAL HANDSAKE --
202
    GUI: {"method": "post", "ui_id": <GUI ID>}
203
    HELPER: {"ACCEPTED": 202, "action": "post ui_id"}" or
204
        "{"REJECTED": 401, "action": "post ui_id"}
205

206
    -- ERRORS WITH SIGNIFICANCE --
207 208 209 210 211 212 213
    -- SHUT DOWN --
    GUI: {"method": "post", "path": "shutdown"}

    -- PAUSE --
    GUI: {"method": "post", "path": "pause"}
    HELPER: {"OK": 200, "action": "post pause"} or error

214
    -- START --
215 216 217
    GUI: {"method": "post", "path": "start"}
    HELPER: {"OK": 200, "action": "post start"} or error

218 219 220 221
    -- FORCE START --
    GUI: {"method": "post", "path": "force"}
    HELPER: {"OK": 200, "action": "post force"} or error

222 223 224 225 226 227 228 229 230
    -- GET SETTINGS --
    GUI: {"method": "get", "path": "settings"}
    HELPER:
        {
            "action": "get settings",
            "token": <user token>,
            "url": <auth url>,
            "container": <container>,
            "directory": <local directory>,
231 232 233
            "exclude": <file path>,
            "language": <en|el>,
            "sync_on_start": <true|false>
234 235 236 237 238 239 240 241 242
        } or {<ERROR>: <ERROR CODE>}

    -- PUT SETTINGS --
    GUI: {
            "method": "put", "path": "settings",
            "token": <user token>,
            "url": <auth url>,
            "container": <container>,
            "directory": <local directory>,
243 244 245
            "exclude": <file path>,
            "language": <en|el>,
            "sync_on_start": <true|false>
246 247 248 249 250 251
        }
    HELPER: {"CREATED": 201, "action": "put settings",} or
        {<ERROR>: <ERROR CODE>, "action": "get settings",}

    -- GET STATUS --
    GUI: {"method": "get", "path": "status"}
252
    HELPER: {"code": <int>,
253
            "synced": <int>, "unsynced": <int>, "failed": <int>,
254 255
            "action": "get status"
        } or {<ERROR>: <ERROR CODE>, "action": "get status"}
256
    """
257 258
    status = utils.ThreadSafeDict()
    with status.lock() as d:
259
        d.update(code=STATUS['UNINITIALIZED'], synced=0, unsynced=0, failed=0)
260

261
    ui_id = None
262
    session_db = None
263 264 265 266
    accepted = False
    settings = dict(
        token=None, url=None,
        container=None, directory=None,
267
        exclude=None, sync_on_start=True, language="en")
268
    cnf = AgkyraConfig()
269
    essentials = ('url', 'token', 'container', 'directory')
270

271 272 273 274 275
    def get_status(self, key=None):
        """:return: updated status dict or value of specified key"""
        if self.syncer and self.can_sync():
            self._consume_messages()
            with self.status.lock() as d:
276
                LOGGER.debug('Status was %s' % d['code'])
277 278 279 280 281 282 283
                if d['code'] in (
                        STATUS['UNINITIALIZED'], STATUS['INITIALIZING']):
                    if self.syncer.paused:
                        d['code'] = STATUS['PAUSED']
                    elif d['code'] != STATUS['PAUSING'] or (
                            d['unsynced'] == d['synced'] + d['failed']):
                        d['code'] = STATUS['SYNCING']
284
        with self.status.lock() as d:
285
            LOGGER.debug('Status is now %s' % d['code'])
286 287 288 289
            return d.get(key, None) if key else dict(d)

    def set_status(self, **kwargs):
        with self.status.lock() as d:
290
            LOGGER.debug('Set status to %s' % kwargs)
291 292
            d.update(kwargs)

293 294
    @property
    def syncer(self):
295
        """:returns: the first syncer object or None"""
296 297 298 299 300
        with SYNCERS.lock() as d:
            for sync_key, sync_obj in d.items():
                return sync_obj
        return None

301
    def clean_db(self):
302
        """Clean DB from current session trace"""
303
        LOGGER.debug('Remove current session trace')
304 305
        with database.TransactedConnection(self.session_db) as db:
            db.unregister_heartbeat(self.ui_id)
306

307
    def shutdown_syncer(self, syncer_key=0, timeout=None):
308
        """Shutdown the syncer backend object"""
309
        LOGGER.debug('Shutdown syncer')
310 311 312
        with SYNCERS.lock() as d:
            syncer = d.pop(syncer_key, None)
            if syncer and self.can_sync():
313
                remaining = syncer.stop_all_daemons(timeout=timeout)
314
                LOGGER.debug('Wait open syncs to complete')
315
                syncer.wait_sync_threads(timeout=remaining)
316

317 318 319 320
    def _get_default_sync(self):
        """Get global.default_sync or pick the first sync as default
        If there are no syncs, create a 'default' sync.
        """
321
        sync = self.cnf.get('global', 'default_sync')
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342
        if not sync:
            for sync in self.cnf.keys('sync'):
                break
            self.cnf.set('global', 'default_sync', sync or 'default')
        return sync or 'default'

    def _get_sync_cloud(self, sync):
        """Get the <sync>.cloud or pick the first cloud and use it
        In case of cloud picking, set the cloud as the <sync>.cloud for future
        sessions.
        If no clouds are found, create a 'default' cloud, with an empty url.
        """
        try:
            cloud = self.cnf.get_sync(sync, 'cloud')
        except KeyError:
            cloud = None
        if not cloud:
            for cloud in self.cnf.keys('cloud'):
                break
            self.cnf.set_sync(sync, 'cloud', cloud or 'default')
        return cloud or 'default'
343

344
    def _load_settings(self):
345
        LOGGER.debug('Start loading settings')
346 347
        sync = self._get_default_sync()
        cloud = self._get_sync_cloud(sync)
348

349 350 351 352 353 354 355 356 357
        for option in ('url', 'token'):
            try:
                value = self.cnf.get_cloud(cloud, option)
                if not value:
                    raise Exception()
                self.settings[option] = value
            except Exception:
                self.settings[option] = None
                self.set_status(code=STATUS['SETTINGS MISSING'])
358

359 360 361 362
        self.settings['sync_on_start'] = (
            self.cnf.get('global', 'sync_on_start') == 'on')
        self.settings['language'] = self.cnf.get('global', 'language')

363 364
        # for option in ('container', 'directory', 'exclude'):
        for option in ('container', 'directory'):
365
            try:
366 367 368 369
                 value = self.cnf.get_sync(sync, option)
                 if not value:
                    raise KeyError()
                 self.settings[option] = value
370
            except KeyError:
371
                LOGGER.debug('No %s is set' % option)
372
                self.set_status(code=STATUS['SETTINGS MISSING'])
373

374
        LOGGER.debug('Finished loading settings')
375

376
    def _dump_settings(self):
377
        LOGGER.debug('Saving settings')
378
        sync = self._get_default_sync()
379
        changes = False
380

381
        if not self.settings.get('url', None):
382
            LOGGER.debug('No cloud settings to save')
383
        else:
384
            LOGGER.debug('Save cloud settings')
385
            cloud = self._get_sync_cloud(sync)
386 387

            try:
388
                old_url = self.cnf.get_cloud(cloud, 'url') or ''
389
            except KeyError:
390 391 392 393 394 395 396 397 398
                old_url = self.settings['url']

            while old_url and old_url != self.settings['url']:
                cloud = '%s_%s' % (cloud, sync)
                try:
                    self.cnf.get_cloud(cloud, 'url')
                except KeyError:
                    break

399
            LOGGER.debug('Cloud name is %s' % cloud)
400 401 402 403 404
            self.cnf.set_cloud(cloud, 'url', self.settings['url'])
            self.cnf.set_cloud(cloud, 'token', self.settings['token'] or '')
            self.cnf.set_sync(sync, 'cloud', cloud)
            changes = True

405
        LOGGER.debug('Save sync settings, name is %s' % sync)
406 407
        # for option in ('directory', 'container', 'exclude'):
        for option in ('directory', 'container'):
408
            self.cnf.set_sync(sync, option, self.settings[option] or '')
409
            changes = True
410

411 412 413 414 415
        self.cnf.set('global', 'language', self.settings.get('language', 'en'))
        sync_on_start = self.settings.get('sync_on_start', False)
        self.cnf.set(
            'global', 'sync_on_start', 'on' if sync_on_start else 'off')

416 417
        if changes:
            self.cnf.write()
418
            LOGGER.debug('Settings saved')
419
        else:
420
            LOGGER.debug('No setting changes spotted')
421

422 423 424 425 426
    def _essentials_changed(self, new_settings):
        """Check if essential settings have changed in new_settings"""
        return all([
            self.settings[e] == self.settings[e] for e in self.essentials])

427
    def _consume_messages(self, max_consumption=10):
428
        """Update status by consuming and understanding syncer messages"""
429 430
        if self.can_sync():
            msg = self.syncer.get_next_message()
431 432 433 434
            # if not msg:
            #     with self.status.lock() as d:
            #         if d['unsynced'] == d['synced'] + d['failed']:
            #             d.update(unsynced=0, synced=0, failed=0)
435
            while msg:
436
                if isinstance(msg, messaging.SyncMessage):
437 438
                    LOGGER.debug(
                        'UNSYNCED +1 %s' % getattr(msg, 'objname', ''))
439
                    self.set_status(unsynced=self.get_status('unsynced') + 1)
440
                elif isinstance(msg, messaging.AckSyncMessage):
441
                    LOGGER.debug('SYNCED +1 %s' % getattr(msg, 'objname', ''))
442
                    self.set_status(synced=self.get_status('synced') + 1)
443
                elif isinstance(msg, messaging.SyncErrorMessage):
444
                    LOGGER.debug('FAILED +1 %s' % getattr(msg, 'objname', ''))
445
                    self.set_status(failed=self.get_status('failed') + 1)
446
                elif isinstance(msg, messaging.LocalfsSyncDisabled):
447 448 449 450
                    LOGGER.debug(
                        'STOP BACKEND, %s'% getattr(msg, 'objname', ''))
                    LOGGER.debug(
                        'CHANGE STATUS TO: %s' % STATUS['DIRECTORY ERROR'])
451
                    self.set_status(code=STATUS['DIRECTORY ERROR'])
452 453
                    self.syncer.stop_all_daemons()
                elif isinstance(msg, messaging.PithosSyncDisabled):
454 455
                    LOGGER.debug(
                        'STOP BACKEND, %s'% getattr(msg, 'objname', ''))
456
                    self.set_status(code=STATUS['CONTAINER ERROR'])
457
                    self.syncer.stop_all_daemons()
458 459 460 461 462 463 464 465 466 467
                elif isinstance(msg, messaging.PithosAuthTokenError):
                    LOGGER.debug(
                        'STOP BACKEND, %s'% getattr(msg, 'objname', ''))
                    self.set_status(code=STATUS['TOKEN ERROR'])
                    self.syncer.stop_all_daemons()
                elif isinstance(msg, messaging.PithosGenericError):
                    LOGGER.debug(
                        'STOP BACKEND, %s'% getattr(msg, 'objname', ''))
                    self.set_status(code=STATUS['CRITICAL ERROR'])
                    self.syncer.stop_all_daemons()
468
                LOGGER.debug('Backend message: %s %s' % (msg.name, type(msg)))
469 470
                # Limit the amount of messages consumed each time
                max_consumption -= 1
471
                if max_consumption:
472
                    msg = self.syncer.get_next_message()
473 474
                else:
                    break
475 476 477 478 479

    def can_sync(self):
        """Check if settings are enough to setup a syncing proccess"""
        return all([self.settings[e] for e in self.essentials])

480
    def init_sync(self, leave_paused=False):
481
        """Initialize syncer"""
482
        self.set_status(code=STATUS['INITIALIZING'])
483
        sync = self._get_default_sync()
484 485 486 487 488 489 490 491 492 493 494 495 496 497 498

        kwargs = dict(agkyra_path=AGKYRA_DIR)
        # Get SSL settings
        cloud = self._get_sync_cloud(sync)
        try:
            ignore_ssl = self.cnf.get_cloud(cloud, 'ignore_ssl') in ('on', )
            kwargs['ignore_ssl'] = ignore_ssl
        except KeyError:
            ignore_ssl = None
        if not ignore_ssl:
            try:
                kwargs['ca_certs'] = self.cnf.get_cloud(cloud, 'ca_certs')
            except KeyError:
                pass

499
        syncer_ = None
500 501 502 503 504 505 506
        try:
            syncer_settings = setup.SyncerSettings(
                self.settings['url'], self.settings['token'],
                self.settings['container'], self.settings['directory'],
                **kwargs)
            master = pithos_client.PithosFileClient(syncer_settings)
            slave = localfs_client.LocalfsFileClient(syncer_settings)
507
            syncer_ = syncer.FileSyncer(syncer_settings, master, slave)
508 509 510 511
            # Check if syncer is ready, by consuming messages
            local_ok, remote_ok = False, False
            for i in range(2):

512
                LOGGER.debug('Get message %s' % (i + 1))
513
                msg = syncer_.get_next_message(block=True)
514
                LOGGER.debug('Got message: %s' % msg)
515

516
                if isinstance(msg, messaging.LocalfsSyncDisabled):
517 518 519
                    self.set_status(code=STATUS['DIRECTORY ERROR'])
                    local_ok = False
                    break
520
                elif isinstance(msg, messaging.PithosSyncDisabled):
521
                    self.set_status(code=STATUS['CONTAINER ERROR'])
522 523 524 525 526 527
                    remote_ok = False
                    break
                elif isinstance(msg, messaging.LocalfsSyncEnabled):
                    local_ok = True
                elif isinstance(msg, messaging.PithosSyncEnabled):
                    remote_ok = True
528
                else:
529
                    LOGGER.error('Unexpected message %s' % msg)
530 531 532
                    self.set_status(code=STATUS['CRITICAL ERROR'])
                    break
            if local_ok and remote_ok:
533
                syncer_.initiate_probe()
534 535
                new_status = 'PAUSED' if leave_paused else 'READY'
                self.set_status(code=STATUS[new_status])
536 537 538 539 540 541 542 543 544 545
        except pithos_client.ClientError as ce:
            LOGGER.debug('backend init failed: %s %s' % (ce, ce.status))
            try:
                code = {
                    400: STATUS['AUTH URL ERROR'],
                    401: STATUS['TOKEN ERROR'],
                }[ce.status]
            except KeyError:
                code = STATUS['UNINITIALIZED']
            self.set_status(code=code)
546
        finally:
547
            self.set_status(synced=0, unsynced=0)
548 549
            with SYNCERS.lock() as d:
                d[0] = syncer_
550

551 552 553 554 555
    # Syncer-related methods
    def get_settings(self):
        return self.settings

    def set_settings(self, new_settings):
556
        """Set the settings and dump them to permanent storage if needed"""
557
        # Prepare setting save
558
        could_sync = self.syncer and self.can_sync()
559
        old_status = self.get_status('code')
560 561
        ok_not_syncing = [STATUS['READY'], STATUS['PAUSING'], STATUS['PAUSED']]
        active = ok_not_syncing + [STATUS['SYNCING']]
562

563
        must_reset_syncing = self._essentials_changed(new_settings)
564 565 566
        if must_reset_syncing and old_status in active:
            LOGGER.debug('Temporary backend shutdown to save settings')
            self.shutdown_syncer()
567 568

        # save settings
569
        self.settings.update(new_settings)
570 571
        self._dump_settings()

572
        # Restart
573 574 575 576
        LOGGER.debug('Reload settings')
        self._load_settings()
        can_sync = must_reset_syncing and self.can_sync()
        if can_sync:
577 578
            leave_paused = old_status in ok_not_syncing or \
                           not self.settings.get('sync_on_start', False)
579
            LOGGER.debug('Restart backend')
580
            self.init_sync(leave_paused=leave_paused)
581

582 583 584
    def _pause_syncer(self):
        syncer_ = self.syncer
        syncer_.stop_decide()
585
        LOGGER.debug('Wait open syncs to complete')
586 587 588
        syncer_.wait_sync_threads()

    def pause_sync(self):
589
        """Pause syncing (assuming it is up and running)"""
590
        if self.syncer:
591
            self.set_status(code=STATUS['PAUSING'])
592 593
            self.syncer.stop_decide()
            self.set_status(code=STATUS['PAUSED'])
594 595

    def start_sync(self):
596
        """Start syncing"""
597
        self.syncer.start_decide()
598
        self.set_status(code=STATUS['SYNCING'])
599

600 601 602
    def force_sync(self):
        """Force syncing, assuming there is a directory or container problem"""
        self.set_status(code=STATUS['INITIALIZING'])
603 604
        self.syncer.settings.purge_db_archives_and_enable()
        self.syncer.initiate_probe()
605
        self.set_status(code=STATUS['READY'])
606

607
    def send_json(self, msg):
608
        LOGGER.debug('send: %s' % msg)
609 610 611 612 613 614 615 616
        self.send(json.dumps(msg))

    # Protocol handling methods
    def _post(self, r):
        """Handle POST requests"""
        if self.accepted:
            action = r['path']
            if action == 'shutdown':
617 618
                # Clean db to cause syncer backend to shut down
                self.set_status(code=STATUS['SHUTTING DOWN'])
619
                self.shutdown_syncer(timeout=5)
620
                self.clean_db()
621 622
                return
            {
623
                'init': self.init_sync,
624
                'start': self.start_sync,
625 626
                'pause': self.pause_sync,
                'force': self.force_sync
627 628
            }[action]()
            self.send_json({'OK': 200, 'action': 'post %s' % action})
629
        elif r['ui_id'] == self.ui_id:
630
            self.accepted = True
631
            self.send_json({'ACCEPTED': 202, 'action': 'post ui_id'})
632
            self._load_settings()
633 634 635
            status = self.get_status('code')
            if self.can_sync() and status == STATUS['UNINITIALIZED']:
                self.set_status(code=STATUS['SETTINGS READY'])
636
        else:
637
            action = r.get('path', 'ui_id')
638 639 640 641 642
            self.send_json({'REJECTED': 401, 'action': 'post %s' % action})
            self.terminate()

    def _put(self, r):
        """Handle PUT requests"""
643
        if self.accepted:
644
            LOGGER.debug('put %s' % r)
645 646 647 648
            action = r.pop('path')
            self.set_settings(r)
            r.update({'CREATED': 201, 'action': 'put %s' % action})
            self.send_json(r)
649 650
        else:
            action = r['path']
651 652
            self.send_json({
                'UNAUTHORIZED UI': 401, 'action': 'put %s' % action})
653
            self.terminate()
654 655 656 657 658

    def _get(self, r):
        """Handle GET requests"""
        action = r.pop('path')
        if not self.accepted:
659 660
            self.send_json({
                'UNAUTHORIZED UI': 401, 'action': 'get %s' % action})
661 662 663 664 665 666 667 668 669 670 671 672 673 674 675
            self.terminate()
        else:
            data = {
                'settings': self.get_settings,
                'status': self.get_status,
            }[action]()
            data['action'] = 'get %s' % action
            self.send_json(data)

    def received_message(self, message):
        """Route requests to corresponding handling methods"""
        try:
            r = json.loads('%s' % message)
        except ValueError as ve:
            self.send_json({'BAD REQUEST': 400})
676
            LOGGER.error('JSON ERROR: %s' % ve)
677 678 679 680 681 682 683 684 685
            return
        try:
            method = r.pop('method')
            {
                'post': self._post,
                'put': self._put,
                'get': self._get
            }[method](r)
        except KeyError as ke:
686 687
            action = method + ' ' + r.get('path', '')
            self.send_json({'BAD REQUEST': 400, 'action': action})
688
            LOGGER.error('KEY ERROR: %s' % ke)
689 690 691 692 693
        except setup.ClientError as ce:
            action = '%s %s' % (
                method, r.get('path', 'ui_id' if 'ui_id' in r else ''))
            self.send_json({'%s' % ce: ce.status, 'action': action})
            return
694 695
        except Exception as e:
            self.send_json({'INTERNAL ERROR': 500})
696
            reason = '%s %s' % (method or '', r)
697
            LOGGER.error('EXCEPTION (%s): %s' % (reason, e))
698
            self.terminate()
699 700


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771
def close_fds():
    import resource
    # Default maximum for the number of available file descriptors.
    MAXFD = 1024

    maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
    if (maxfd == resource.RLIM_INFINITY):
        maxfd = MAXFD

    # Iterate through and close all file descriptors.
    for fd in range(2, maxfd):
        try:
            os.close(fd)
        except OSError:  # ERROR, fd wasn't open to begin with (ignored)
            pass


# Adapted from https://code.activestate.com/recipes/278731/
def daemonize(command):
    # Default working directory for the daemon.
    WORKDIR = "/"

    # The standard I/O file descriptors are redirected to /dev/null by default.
    if (hasattr(os, "devnull")):
        REDIRECT_TO = os.devnull
    else:
        REDIRECT_TO = "/dev/null"

    pid = os.fork()
    if not pid:
        # To become the session leader of this new session and the process
        # group leader of the new process group, we call os.setsid(). The
        # process is also guaranteed not to have a controlling terminal.
        os.setsid()

        # Fork a second child and exit immediately to prevent zombies.
        pid = os.fork()
        if not pid:
            # Since the current working directory may be a mounted
            # filesystem, we avoid the issue of not being able to unmount
            # the filesystem at shutdown time by changing it to the root
            # directory.
            os.chdir(WORKDIR)

            # Close all open file descriptors. This prevents the child from
            # keeping open any file descriptors inherited from the parent.
            # There is a variety of methods to accomplish this task.
            close_fds()

            # Redirect the standard I/O file descriptors to the specified
            # file. Since the daemon has no controlling terminal, most
            # daemons redirect stdin, stdout, and stderr to /dev/null. This
            # is done to prevent side-effects from reads and writes to the
            # standard I/O file descriptors.

            # This call to open is guaranteed to return the lowest file
            # descriptor, which will be 0 (stdin), since it was closed
            # above.
            os.open(REDIRECT_TO, os.O_RDWR)  # standard input (0)

            # Duplicate standard input to standard output and standard error.
            os.dup2(0, 1)  # standard output (1)
            os.dup2(0, 2)  # standard error (2)

            os.execlp(*command)
        else:
            os._exit(0)
    else:
        os.wait()


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
772
def launch_server(callback, debug):
773
    """Launch the server in a separate process"""
774
    LOGGER.info('Start SessionHelper session')
775 776 777
    opts = ["start", "daemon"]
    if debug:
        opts.append('-d')
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
778
    if utils.iswin():
779 780
        command = [] if ISFROZEN else ["pythonw.exe"]
        command.append(callback)
781
        command += opts
782
        subprocess.Popen(command, close_fds=True)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
783
    else:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
784 785 786
        command = [callback, callback]
        command += opts
        daemonize(command)