protocol.py 29 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16
from wsgiref.simple_server import make_server
17
from ws4py.websocket import WebSocket
18 19 20 21
from ws4py.server.wsgiutils import WebSocketWSGIApplication
from ws4py.server.wsgirefserver import WSGIServer, WebSocketWSGIRequestHandler
from hashlib import sha1
from threading import Thread
22
import time
23
import os
24
import sys
25 26
import json
import logging
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
27
import subprocess
28
from agkyra.syncer import (
29 30
    syncer, setup, pithos_client, localfs_client, messaging, utils, database,
    common)
31
from agkyra.config import AgkyraConfig, AGKYRA_DIR
32

33 34 35 36 37 38 39 40
if getattr(sys, 'frozen', False):
    # we are running in a |PyInstaller| bundle
    BASEDIR = sys._MEIPASS
    ISFROZEN = True
else:
    # we are running in a normal Python environment
    BASEDIR = os.path.dirname(os.path.realpath(__file__))
    ISFROZEN = False
41

42
RESOURCES = os.path.join(BASEDIR, 'resources')
43

44
LOGGER = logging.getLogger(__name__)
45
SYNCERS = utils.ThreadSafeDict()
46

47
with open(os.path.join(RESOURCES, 'ui_data/common_en.json')) as f:
48 49
    COMMON = json.load(f)
STATUS = COMMON['STATUS']
50

51

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
class SessionDB(database.DB):
    def init(self):
        db = self.db
        db.execute(
            'CREATE TABLE IF NOT EXISTS heart ('
            'ui_id VARCHAR(256), address text, beat VARCHAR(32)'
            ')')

    def get_all_heartbeats(self):
        db = self.db
        r = db.execute('SELECT * FROM heart')
        return r.fetchall()

    def register_heartbeat(self, ui_id, address):
        db = self.db
        db.execute('INSERT INTO heart VALUES (?, ?, ?)',
                   (ui_id, address, time.time()))

    def update_heartbeat(self, ui_id):
        db = self.db
        r = db.execute('SELECT ui_id FROM heart WHERE ui_id=?', (ui_id,))
        if r.fetchall():
            db.execute('UPDATE heart SET beat=? WHERE ui_id=?',
                       (time.time(), ui_id))
            return True
        return False

    def unregister_heartbeat(self, ui_id):
        db = self.db
        db.execute('DELETE FROM heart WHERE ui_id=?', (ui_id,))
82 83


84
class SessionHelper(object):
85 86
    """Enables creation of a session daemon and retrieves credentials of an
    existing one
87
    """
88
    session_timeout = 20
89

90
    def __init__(self, **kwargs):
91
        """Setup the helper server"""
92
        db_name = kwargs.get(
93
            'session_db', os.path.join(AGKYRA_DIR, 'session.db'))
94 95
        self.session_db = common.DBTuple(dbtype=SessionDB, dbname=db_name)
        database.initialize(self.session_db)
96

97
    def load_active_session(self):
98 99 100 101
        with database.TransactedConnection(self.session_db) as db:
            return self._load_active_session(db)

    def _load_active_session(self, db):
102
        """Load a session from db"""
103
        sessions = db.get_all_heartbeats()
104
        if sessions:
105 106
            last, expected_id = sessions[-1], getattr(self, 'ui_id', None)
            if expected_id and last[0] != '%s' % expected_id:
107
                LOGGER.debug('Session ID is old')
108
                return None
109 110 111 112
            now, last_beat = time.time(), float(last[2])
            if now - last_beat < self.session_timeout:
                # Found an active session
                return dict(ui_id=last[0], address=last[1])
113
        LOGGER.debug('No active sessions found')
114 115
        return None

116 117
    def create_session_daemon(self):
        """Create and return a new daemon, or None if one exists"""
118 119 120 121 122 123 124
        with database.TransactedConnection(self.session_db) as db:
            session = self._load_active_session(db)
            if session:
                return None
            session_daemon = SessionDaemon(self.session_db)
            db.register_heartbeat(session_daemon.ui_id, session_daemon.address)
            return session_daemon
125

126
    def wait_session_to_load(self, timeout=20, step=0.2):
127 128 129 130 131 132 133 134 135 136 137 138
        """Wait while the session is loading e.g. in another process
            :returns: the session or None if timeout
        """
        time_passed = 0
        while time_passed < timeout:
            self.session = self.load_active_session()
            if self.session:
                return self.session
            time_passed += step
            time.sleep(step)
        return None

139 140 141 142 143 144 145 146 147 148
    def wait_session_to_stop(self, timeout=20, step=2):
        """Wait while the session is shutting down
            :returns: True if stopped, False if timed out and still running
        """
        time_passed = 0
        while time_passed < timeout and self.load_active_session():
            time.sleep(step)
            time_passed += step
        return not bool(self.load_active_session())

149 150 151 152 153

class SessionDaemon(object):
    """A WebSocket server which inspects a heartbeat and decides whether to
    shut down
    """
154
    def __init__(self, session_db, *args, **kwargs):
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
        self.session_db = session_db
        ui_id = sha1(os.urandom(128)).hexdigest()

        LOCAL_ADDR = '127.0.0.1'
        WebSocketProtocol.ui_id = ui_id
        WebSocketProtocol.session_db = session_db
        server = make_server(
            LOCAL_ADDR, 0,
            server_class=WSGIServer,
            handler_class=WebSocketWSGIRequestHandler,
            app=WebSocketWSGIApplication(handler_cls=WebSocketProtocol))
        server.initialize_websockets_manager()
        address = 'ws://%s:%s' % (LOCAL_ADDR, server.server_port)
        self.server = server
        self.ui_id = ui_id
        self.address = address

172
    def heartbeat(self):
173
        """Periodically update the session database timestamp"""
174
        while True:
175
            time.sleep(2)
176 177 178 179
            with database.TransactedConnection(self.session_db) as db:
                found = db.update_heartbeat(self.ui_id)
                if not found:
                    break
180 181 182 183 184 185 186 187
        self.close_manager()
        self.server.shutdown()

    def close_manager(self):
        manager = self.server.manager
        manager.close_all()
        manager.stop()
        manager.join()
188

189
    def start(self):
190 191 192 193 194
        """Start the server in a thread"""
        t = Thread(target=self.heartbeat)
        t.start()
        self.server.serve_forever()
        t.join()
195
        LOGGER.debug('WSGI server is down')
196

197

198 199 200
class WebSocketProtocol(WebSocket):
    """Helper-side WebSocket protocol for communication with GUI:

201
    -- INTERNAL HANDSAKE --
202
    GUI: {"method": "post", "ui_id": <GUI ID>}
203
    HELPER: {"ACCEPTED": 202, "action": "post ui_id"}" or
204
        "{"REJECTED": 401, "action": "post ui_id"}
205

206
    -- ERRORS WITH SIGNIFICANCE --
207 208 209 210 211 212 213
    -- SHUT DOWN --
    GUI: {"method": "post", "path": "shutdown"}

    -- PAUSE --
    GUI: {"method": "post", "path": "pause"}
    HELPER: {"OK": 200, "action": "post pause"} or error

214
    -- START --
215 216 217
    GUI: {"method": "post", "path": "start"}
    HELPER: {"OK": 200, "action": "post start"} or error

218 219 220 221
    -- FORCE START --
    GUI: {"method": "post", "path": "force"}
    HELPER: {"OK": 200, "action": "post force"} or error

222 223 224 225 226 227 228 229 230
    -- GET SETTINGS --
    GUI: {"method": "get", "path": "settings"}
    HELPER:
        {
            "action": "get settings",
            "token": <user token>,
            "url": <auth url>,
            "container": <container>,
            "directory": <local directory>,
231 232 233
            "exclude": <file path>,
            "language": <en|el>,
            "sync_on_start": <true|false>
234 235 236 237 238 239 240 241 242
        } or {<ERROR>: <ERROR CODE>}

    -- PUT SETTINGS --
    GUI: {
            "method": "put", "path": "settings",
            "token": <user token>,
            "url": <auth url>,
            "container": <container>,
            "directory": <local directory>,
243 244 245
            "exclude": <file path>,
            "language": <en|el>,
            "sync_on_start": <true|false>
246 247 248 249 250 251
        }
    HELPER: {"CREATED": 201, "action": "put settings",} or
        {<ERROR>: <ERROR CODE>, "action": "get settings",}

    -- GET STATUS --
    GUI: {"method": "get", "path": "status"}
252
    HELPER: {"code": <int>,
253
            "synced": <int>, "unsynced": <int>, "failed": <int>,
254 255
            "action": "get status"
        } or {<ERROR>: <ERROR CODE>, "action": "get status"}
256
    """
257 258
    status = utils.ThreadSafeDict()
    with status.lock() as d:
259
        d.update(code=STATUS['UNINITIALIZED'], synced=0, unsynced=0, failed=0)
260

261
    ui_id = None
262
    session_db = None
263 264 265 266
    accepted = False
    settings = dict(
        token=None, url=None,
        container=None, directory=None,
267
        exclude=None, sync_on_start=True, language="en")
268
    cnf = AgkyraConfig()
269
    essentials = ('url', 'token', 'container', 'directory')
270

271 272 273 274 275
    def get_status(self, key=None):
        """:return: updated status dict or value of specified key"""
        if self.syncer and self.can_sync():
            self._consume_messages()
            with self.status.lock() as d:
276
                LOGGER.debug('Status was %s' % d['code'])
277 278 279 280 281 282 283
                if d['code'] in (
                        STATUS['UNINITIALIZED'], STATUS['INITIALIZING']):
                    if self.syncer.paused:
                        d['code'] = STATUS['PAUSED']
                    elif d['code'] != STATUS['PAUSING'] or (
                            d['unsynced'] == d['synced'] + d['failed']):
                        d['code'] = STATUS['SYNCING']
284
        with self.status.lock() as d:
285
            LOGGER.debug('Status is now %s' % d['code'])
286 287 288 289
            return d.get(key, None) if key else dict(d)

    def set_status(self, **kwargs):
        with self.status.lock() as d:
290
            LOGGER.debug('Set status to %s' % kwargs)
291 292
            d.update(kwargs)

293 294
    @property
    def syncer(self):
295
        """:returns: the first syncer object or None"""
296 297 298 299 300
        with SYNCERS.lock() as d:
            for sync_key, sync_obj in d.items():
                return sync_obj
        return None

301
    def clean_db(self):
302
        """Clean DB from current session trace"""
303
        LOGGER.debug('Remove current session trace')
304 305
        with database.TransactedConnection(self.session_db) as db:
            db.unregister_heartbeat(self.ui_id)
306

307
    def shutdown_syncer(self, syncer_key=0):
308
        """Shutdown the syncer backend object"""
309
        LOGGER.debug('Shutdown syncer')
310 311 312 313
        with SYNCERS.lock() as d:
            syncer = d.pop(syncer_key, None)
            if syncer and self.can_sync():
                syncer.stop_all_daemons()
314
                LOGGER.debug('Wait open syncs to complete')
315 316
                syncer.wait_sync_threads()

317 318 319 320
    def _get_default_sync(self):
        """Get global.default_sync or pick the first sync as default
        If there are no syncs, create a 'default' sync.
        """
321
        sync = self.cnf.get('global', 'default_sync')
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342
        if not sync:
            for sync in self.cnf.keys('sync'):
                break
            self.cnf.set('global', 'default_sync', sync or 'default')
        return sync or 'default'

    def _get_sync_cloud(self, sync):
        """Get the <sync>.cloud or pick the first cloud and use it
        In case of cloud picking, set the cloud as the <sync>.cloud for future
        sessions.
        If no clouds are found, create a 'default' cloud, with an empty url.
        """
        try:
            cloud = self.cnf.get_sync(sync, 'cloud')
        except KeyError:
            cloud = None
        if not cloud:
            for cloud in self.cnf.keys('cloud'):
                break
            self.cnf.set_sync(sync, 'cloud', cloud or 'default')
        return cloud or 'default'
343

344
    def _load_settings(self):
345
        LOGGER.debug('Start loading settings')
346 347
        sync = self._get_default_sync()
        cloud = self._get_sync_cloud(sync)
348

349 350 351 352 353 354 355 356 357
        for option in ('url', 'token'):
            try:
                value = self.cnf.get_cloud(cloud, option)
                if not value:
                    raise Exception()
                self.settings[option] = value
            except Exception:
                self.settings[option] = None
                self.set_status(code=STATUS['SETTINGS MISSING'])
358

359 360 361 362
        self.settings['sync_on_start'] = (
            self.cnf.get('global', 'sync_on_start') == 'on')
        self.settings['language'] = self.cnf.get('global', 'language')

363 364
        # for option in ('container', 'directory', 'exclude'):
        for option in ('container', 'directory'):
365
            try:
366 367 368 369
                 value = self.cnf.get_sync(sync, option)
                 if not value:
                    raise KeyError()
                 self.settings[option] = value
370
            except KeyError:
371
                LOGGER.debug('No %s is set' % option)
372
                self.set_status(code=STATUS['SETTINGS MISSING'])
373

374
        LOGGER.debug('Finished loading settings')
375

376
    def _dump_settings(self):
377
        LOGGER.debug('Saving settings')
378
        sync = self._get_default_sync()
379
        changes = False
380

381
        if not self.settings.get('url', None):
382
            LOGGER.debug('No cloud settings to save')
383
        else:
384
            LOGGER.debug('Save cloud settings')
385
            cloud = self._get_sync_cloud(sync)
386 387

            try:
388
                old_url = self.cnf.get_cloud(cloud, 'url') or ''
389
            except KeyError:
390 391 392 393 394 395 396 397 398
                old_url = self.settings['url']

            while old_url and old_url != self.settings['url']:
                cloud = '%s_%s' % (cloud, sync)
                try:
                    self.cnf.get_cloud(cloud, 'url')
                except KeyError:
                    break

399
            LOGGER.debug('Cloud name is %s' % cloud)
400 401 402 403 404
            self.cnf.set_cloud(cloud, 'url', self.settings['url'])
            self.cnf.set_cloud(cloud, 'token', self.settings['token'] or '')
            self.cnf.set_sync(sync, 'cloud', cloud)
            changes = True

405
        LOGGER.debug('Save sync settings, name is %s' % sync)
406 407
        # for option in ('directory', 'container', 'exclude'):
        for option in ('directory', 'container'):
408
            self.cnf.set_sync(sync, option, self.settings[option] or '')
409
            changes = True
410

411 412 413 414 415
        self.cnf.set('global', 'language', self.settings.get('language', 'en'))
        sync_on_start = self.settings.get('sync_on_start', False)
        self.cnf.set(
            'global', 'sync_on_start', 'on' if sync_on_start else 'off')

416 417
        if changes:
            self.cnf.write()
418
            LOGGER.debug('Settings saved')
419
        else:
420
            LOGGER.debug('No setting changes spotted')
421

422 423 424 425 426
    def _essentials_changed(self, new_settings):
        """Check if essential settings have changed in new_settings"""
        return all([
            self.settings[e] == self.settings[e] for e in self.essentials])

427
    def _consume_messages(self, max_consumption=10):
428
        """Update status by consuming and understanding syncer messages"""
429 430
        if self.can_sync():
            msg = self.syncer.get_next_message()
431 432 433 434
            # if not msg:
            #     with self.status.lock() as d:
            #         if d['unsynced'] == d['synced'] + d['failed']:
            #             d.update(unsynced=0, synced=0, failed=0)
435
            while msg:
436
                if isinstance(msg, messaging.SyncMessage):
437 438
                    LOGGER.debug(
                        'UNSYNCED +1 %s' % getattr(msg, 'objname', ''))
439
                    self.set_status(unsynced=self.get_status('unsynced') + 1)
440
                elif isinstance(msg, messaging.AckSyncMessage):
441
                    LOGGER.debug('SYNCED +1 %s' % getattr(msg, 'objname', ''))
442
                    self.set_status(synced=self.get_status('synced') + 1)
443
                elif isinstance(msg, messaging.SyncErrorMessage):
444
                    LOGGER.debug('FAILED +1 %s' % getattr(msg, 'objname', ''))
445
                    self.set_status(failed=self.get_status('failed') + 1)
446
                elif isinstance(msg, messaging.LocalfsSyncDisabled):
447 448 449 450
                    LOGGER.debug(
                        'STOP BACKEND, %s'% getattr(msg, 'objname', ''))
                    LOGGER.debug(
                        'CHANGE STATUS TO: %s' % STATUS['DIRECTORY ERROR'])
451
                    self.set_status(code=STATUS['DIRECTORY ERROR'])
452 453
                    self.syncer.stop_all_daemons()
                elif isinstance(msg, messaging.PithosSyncDisabled):
454 455
                    LOGGER.debug(
                        'STOP BACKEND, %s'% getattr(msg, 'objname', ''))
456
                    self.set_status(code=STATUS['CONTAINER ERROR'])
457
                    self.syncer.stop_all_daemons()
458 459 460 461 462 463 464 465 466 467
                elif isinstance(msg, messaging.PithosAuthTokenError):
                    LOGGER.debug(
                        'STOP BACKEND, %s'% getattr(msg, 'objname', ''))
                    self.set_status(code=STATUS['TOKEN ERROR'])
                    self.syncer.stop_all_daemons()
                elif isinstance(msg, messaging.PithosGenericError):
                    LOGGER.debug(
                        'STOP BACKEND, %s'% getattr(msg, 'objname', ''))
                    self.set_status(code=STATUS['CRITICAL ERROR'])
                    self.syncer.stop_all_daemons()
468
                LOGGER.debug('Backend message: %s %s' % (msg.name, type(msg)))
469 470
                # Limit the amount of messages consumed each time
                max_consumption -= 1
471
                if max_consumption:
472
                    msg = self.syncer.get_next_message()
473 474
                else:
                    break
475 476 477 478 479

    def can_sync(self):
        """Check if settings are enough to setup a syncing proccess"""
        return all([self.settings[e] for e in self.essentials])

480 481
    def init_sync(self):
        """Initialize syncer"""
482
        self.set_status(code=STATUS['INITIALIZING'])
483
        sync = self._get_default_sync()
484 485 486 487 488 489 490 491 492 493 494 495 496 497 498

        kwargs = dict(agkyra_path=AGKYRA_DIR)
        # Get SSL settings
        cloud = self._get_sync_cloud(sync)
        try:
            ignore_ssl = self.cnf.get_cloud(cloud, 'ignore_ssl') in ('on', )
            kwargs['ignore_ssl'] = ignore_ssl
        except KeyError:
            ignore_ssl = None
        if not ignore_ssl:
            try:
                kwargs['ca_certs'] = self.cnf.get_cloud(cloud, 'ca_certs')
            except KeyError:
                pass

499
        syncer_ = None
500 501 502 503 504 505 506
        try:
            syncer_settings = setup.SyncerSettings(
                self.settings['url'], self.settings['token'],
                self.settings['container'], self.settings['directory'],
                **kwargs)
            master = pithos_client.PithosFileClient(syncer_settings)
            slave = localfs_client.LocalfsFileClient(syncer_settings)
507
            syncer_ = syncer.FileSyncer(syncer_settings, master, slave)
508
            self.syncer_settings = syncer_settings
509 510 511 512
            # Check if syncer is ready, by consuming messages
            local_ok, remote_ok = False, False
            for i in range(2):

513
                LOGGER.debug('Get message %s' % (i + 1))
514
                msg = syncer_.get_next_message(block=True)
515
                LOGGER.debug('Got message: %s' % msg)
516

517
                if isinstance(msg, messaging.LocalfsSyncDisabled):
518 519 520
                    self.set_status(code=STATUS['DIRECTORY ERROR'])
                    local_ok = False
                    break
521
                elif isinstance(msg, messaging.PithosSyncDisabled):
522
                    self.set_status(code=STATUS['CONTAINER ERROR'])
523 524 525 526 527 528
                    remote_ok = False
                    break
                elif isinstance(msg, messaging.LocalfsSyncEnabled):
                    local_ok = True
                elif isinstance(msg, messaging.PithosSyncEnabled):
                    remote_ok = True
529
                else:
530
                    LOGGER.error('Unexpected message %s' % msg)
531 532 533
                    self.set_status(code=STATUS['CRITICAL ERROR'])
                    break
            if local_ok and remote_ok:
534
                syncer_.initiate_probe()
535 536 537
                self.set_status(code=STATUS['SYNCING'])
            else:
                syncer_ = None
538 539 540 541 542 543 544 545 546 547
        except pithos_client.ClientError as ce:
            LOGGER.debug('backend init failed: %s %s' % (ce, ce.status))
            try:
                code = {
                    400: STATUS['AUTH URL ERROR'],
                    401: STATUS['TOKEN ERROR'],
                }[ce.status]
            except KeyError:
                code = STATUS['UNINITIALIZED']
            self.set_status(code=code)
548
        finally:
549
            self.set_status(synced=0, unsynced=0)
550 551
            with SYNCERS.lock() as d:
                d[0] = syncer_
552

553 554 555 556 557
    # Syncer-related methods
    def get_settings(self):
        return self.settings

    def set_settings(self, new_settings):
558
        """Set the settings and dump them to permanent storage if needed"""
559
        # Prepare setting save
560
        could_sync = self.syncer and self.can_sync()
561 562 563
        old_status = self.get_status('code')
        active = (STATUS['SYNCING'], STATUS['PAUSING'], STATUS['PAUSED'])

564
        must_reset_syncing = self._essentials_changed(new_settings)
565 566 567
        if must_reset_syncing and old_status in active:
            LOGGER.debug('Temporary backend shutdown to save settings')
            self.shutdown_syncer()
568 569

        # save settings
570
        self.settings.update(new_settings)
571 572
        self._dump_settings()

573
        # Restart
574 575 576 577 578 579 580 581 582 583 584 585
        LOGGER.debug('Reload settings')
        self._load_settings()
        can_sync = must_reset_syncing and self.can_sync()
        if can_sync:
            LOGGER.debug('Restart backend')
            self.init_sync()
            new_status = self.get_status('code')
            if new_status in active:
                must_sync = old_status == STATUS['SYNCING'] or (
                    old_status not in active and (
                        self.settings.get('sync_on_start', False)))
                (self.start_sync if must_sync else self.pause_sync)()
586

587 588 589
    def _pause_syncer(self):
        syncer_ = self.syncer
        syncer_.stop_decide()
590
        LOGGER.debug('Wait open syncs to complete')
591 592 593
        syncer_.wait_sync_threads()

    def pause_sync(self):
594
        """Pause syncing (assuming it is up and running)"""
595
        if self.syncer:
596
            self.set_status(code=STATUS['PAUSING'])
597 598
            self.syncer.stop_decide()
            self.set_status(code=STATUS['PAUSED'])
599 600

    def start_sync(self):
601
        """Start syncing"""
602
        self.syncer.start_decide()
603
        self.set_status(code=STATUS['SYNCING'])
604

605 606 607 608 609 610 611 612 613 614 615
    def force_sync(self):
        """Force syncing, assuming there is a directory or container problem"""
        self.set_status(code=STATUS['INITIALIZING'])
        self.syncer_settings.purge_db_archives_and_enable()
        self.init_sync()
        if self.syncer:
            self.syncer.start_decide()
            self.set_status(code=STATUS['SYNCING'])
        else:
            self.set_status(code=STATUS['CRITICAL ERROR'])

616
    def send_json(self, msg):
617
        LOGGER.debug('send: %s' % msg)
618 619 620 621 622 623 624 625
        self.send(json.dumps(msg))

    # Protocol handling methods
    def _post(self, r):
        """Handle POST requests"""
        if self.accepted:
            action = r['path']
            if action == 'shutdown':
626 627
                # Clean db to cause syncer backend to shut down
                self.set_status(code=STATUS['SHUTTING DOWN'])
628
                self.shutdown_syncer()
629
                self.clean_db()
630 631 632
                return
            {
                'start': self.start_sync,
633 634
                'pause': self.pause_sync,
                'force': self.force_sync
635 636
            }[action]()
            self.send_json({'OK': 200, 'action': 'post %s' % action})
637
        elif r['ui_id'] == self.ui_id:
638
            self.accepted = True
639
            self.send_json({'ACCEPTED': 202, 'action': 'post ui_id'})
640
            self._load_settings()
641
            if (not self.syncer) and self.can_sync():
642
                self.init_sync()
643
                if self.syncer and self.settings['sync_on_start']:
644
                    self.start_sync()
645
        else:
646
            action = r.get('path', 'ui_id')
647 648 649 650 651
            self.send_json({'REJECTED': 401, 'action': 'post %s' % action})
            self.terminate()

    def _put(self, r):
        """Handle PUT requests"""
652
        if self.accepted:
653
            LOGGER.debug('put %s' % r)
654 655 656 657
            action = r.pop('path')
            self.set_settings(r)
            r.update({'CREATED': 201, 'action': 'put %s' % action})
            self.send_json(r)
658 659
        else:
            action = r['path']
660 661
            self.send_json({
                'UNAUTHORIZED UI': 401, 'action': 'put %s' % action})
662
            self.terminate()
663 664 665 666 667

    def _get(self, r):
        """Handle GET requests"""
        action = r.pop('path')
        if not self.accepted:
668 669
            self.send_json({
                'UNAUTHORIZED UI': 401, 'action': 'get %s' % action})
670 671 672 673 674 675 676 677 678 679 680 681 682 683 684
            self.terminate()
        else:
            data = {
                'settings': self.get_settings,
                'status': self.get_status,
            }[action]()
            data['action'] = 'get %s' % action
            self.send_json(data)

    def received_message(self, message):
        """Route requests to corresponding handling methods"""
        try:
            r = json.loads('%s' % message)
        except ValueError as ve:
            self.send_json({'BAD REQUEST': 400})
685
            LOGGER.error('JSON ERROR: %s' % ve)
686 687 688 689 690 691 692 693 694
            return
        try:
            method = r.pop('method')
            {
                'post': self._post,
                'put': self._put,
                'get': self._get
            }[method](r)
        except KeyError as ke:
695 696
            action = method + ' ' + r.get('path', '')
            self.send_json({'BAD REQUEST': 400, 'action': action})
697
            LOGGER.error('KEY ERROR: %s' % ke)
698 699 700 701 702
        except setup.ClientError as ce:
            action = '%s %s' % (
                method, r.get('path', 'ui_id' if 'ui_id' in r else ''))
            self.send_json({'%s' % ce: ce.status, 'action': action})
            return
703 704
        except Exception as e:
            self.send_json({'INTERNAL ERROR': 500})
705
            reason = '%s %s' % (method or '', r)
706
            LOGGER.error('EXCEPTION (%s): %s' % (reason, e))
707
            self.terminate()
708 709


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780
def close_fds():
    import resource
    # Default maximum for the number of available file descriptors.
    MAXFD = 1024

    maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
    if (maxfd == resource.RLIM_INFINITY):
        maxfd = MAXFD

    # Iterate through and close all file descriptors.
    for fd in range(2, maxfd):
        try:
            os.close(fd)
        except OSError:  # ERROR, fd wasn't open to begin with (ignored)
            pass


# Adapted from https://code.activestate.com/recipes/278731/
def daemonize(command):
    # Default working directory for the daemon.
    WORKDIR = "/"

    # The standard I/O file descriptors are redirected to /dev/null by default.
    if (hasattr(os, "devnull")):
        REDIRECT_TO = os.devnull
    else:
        REDIRECT_TO = "/dev/null"

    pid = os.fork()
    if not pid:
        # To become the session leader of this new session and the process
        # group leader of the new process group, we call os.setsid(). The
        # process is also guaranteed not to have a controlling terminal.
        os.setsid()

        # Fork a second child and exit immediately to prevent zombies.
        pid = os.fork()
        if not pid:
            # Since the current working directory may be a mounted
            # filesystem, we avoid the issue of not being able to unmount
            # the filesystem at shutdown time by changing it to the root
            # directory.
            os.chdir(WORKDIR)

            # Close all open file descriptors. This prevents the child from
            # keeping open any file descriptors inherited from the parent.
            # There is a variety of methods to accomplish this task.
            close_fds()

            # Redirect the standard I/O file descriptors to the specified
            # file. Since the daemon has no controlling terminal, most
            # daemons redirect stdin, stdout, and stderr to /dev/null. This
            # is done to prevent side-effects from reads and writes to the
            # standard I/O file descriptors.

            # This call to open is guaranteed to return the lowest file
            # descriptor, which will be 0 (stdin), since it was closed
            # above.
            os.open(REDIRECT_TO, os.O_RDWR)  # standard input (0)

            # Duplicate standard input to standard output and standard error.
            os.dup2(0, 1)  # standard output (1)
            os.dup2(0, 2)  # standard error (2)

            os.execlp(*command)
        else:
            os._exit(0)
    else:
        os.wait()


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
781
def launch_server(callback, debug):
782
    """Launch the server in a separate process"""
783
    LOGGER.info('Start SessionHelper session')
784 785 786
    opts = ["start", "daemon"]
    if debug:
        opts.append('-d')
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
787
    if utils.iswin():
788 789
        command = [] if ISFROZEN else ["pythonw.exe"]
        command.append(callback)
790
        command += opts
791
        subprocess.Popen(command, close_fds=True)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
792
    else:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
793 794 795
        command = [callback, callback]
        command += opts
        daemonize(command)