protocol.py 6.95 KB
Newer Older
1
from ws4py.websocket import WebSocket
2
3
import json
import logging
4
from os.path import abspath
5
from titanic import syncer
6
7
8
from config import AgkyraConfig
from kamaki.clients.astakos import AstakosClient
from kamaki.clients.pithos import PithosClient
9
10
11


LOG = logging.getLogger(__name__)
12
13
14
15
16
17


class WebSocketProtocol(WebSocket):
    """Helper-side WebSocket protocol for communication with GUI:

    -- INTERRNAL HANDSAKE --
18
    GUI: {"method": "post", "gui_id": <GUI ID>}
19
20
    HELPER: {"ACCEPTED": 202, "method": "post"}" or
        "{"REJECTED": 401, "action": "post gui_id"}
21
22
23

    -- SHUT DOWN --
    GUI: {"method": "post", "path": "shutdown"}
24

25
26
    -- PAUSE --
    GUI: {"method": "post", "path": "pause"}
27
    HELPER: {"OK": 200, "action": "post pause"} or error
28
29
30

    -- start --
    GUI: {"method": "post", "path": "start"}
31
    HELPER: {"OK": 200, "action": "post start"} or error
32

33
34
35
36
    -- GET SETTINGS --
    GUI: {"method": "get", "path": "settings"}
    HELPER:
        {
37
            "action": "get settings",
38
39
40
41
42
            "token": <user token>,
            "url": <auth url>,
            "container": <container>,
            "directory": <local directory>,
            "exclude": <file path>
43
        } or {<ERROR>: <ERROR CODE>}
44
45
46
47
48
49
50
51

    -- PUT SETTINGS --
    GUI: {
            "method": "put", "path": "settings",
            "token": <user token>,
            "url": <auth url>,
            "container": <container>,
            "directory": <local directory>,
52
            "pithos_url": <pithos URL>,
53
54
            "exclude": <file path>
        }
55
56
    HELPER: {"CREATED": 201, "action": "put settings",} or
        {<ERROR>: <ERROR CODE>, "action": "get settings",}
57
58
59

    -- GET STATUS --
    GUI: {"method": "get", "path": "status"}
60
61
    HELPER: {"progress": <int>, "paused": <boolean>, "action": "get status"} or
        {<ERROR>: <ERROR CODE>, "action": "get status"}
62
63
    """

64
65
    gui_id = None
    accepted = False
66
    settings = dict(
67
68
69
        token=None, url=None,
        container=None, directory=None,
        exclude=None, pithos_ui=None)
70
71
    status = dict(progress=0, paused=True)
    file_syncer = None
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
    cnf = AgkyraConfig()

    def _load_settings(self):
        sync = self.cnf.get('global', 'default_sync')
        cloud = self.cnf.get_sync(sync, 'cloud')

        url = self.cnf.get_cloud(cloud, 'url')
        token = self.cnf.get_cloud(cloud, 'token')

        astakos = AstakosClient(url, token)
        self.settings['url'], self.settings['token'] = url, token

        try:
            endpoints = astakos.get_endpoints()['access']['serviceCatalog']
            for endpoint in endpoints:
                if endpoint['type'] == PithosClient.service_type:
                    pithos_ui = endpoint['endpoints'][0]['SNF:uiURL']
                    self.settings['pithos_ui'] = pithos_ui
                    break
        except Exception as e:
            LOG.debug('Failed to retrieve pithos_ui: %s' % e)

        for option in ('container', 'directory', 'exclude'):
            self.settings[option] = self.cnf.get_sync(sync, option)

    def _dump_settings(self):
        sync = self.cnf.get('global', 'default_sync')
        cloud = self.cnf.get_sync(sync, 'cloud')

        old_url = self.cnf.get_cloud(cloud, 'url')
        while old_url != self.settings['url']:
            cloud = '%s_%s' % (cloud, sync)
            try:
                self.cnf.get_cloud(cloud, 'url')
            except KeyError:
                break

        self.cnf.set_cloud(cloud, 'url', self.settings['url'])
        self.cnf.set_cloud(cloud, 'token', self.settings['token'])
        self.cnf.set_sync(sync, 'cloud', cloud)

        for option in ('directory', 'container', 'exclude'):
            self.cnf.set_sync(sync, option, self.settings[option])
115
116
117

    # Syncer-related methods
    def get_status(self):
118
119
120
        from random import randint
        if self.status['progress'] < 100:
            self.status['progress'] += 0 if randint(0, 2) else 1
121
        return self.status
122
123

    def get_settings(self):
124
        self._load_settings()
125
126
        return self.settings

127
128
    def set_settings(self, new_settings):
        self.settings = new_settings
129
        self._dump_settings()
130

131
132
133
134
135
    def pause_sync(self):
        self.status['paused'] = True

    def start_sync(self):
        self.status['paused'] = False
136

137
    # WebSocket connection methods
138
    def opened(self):
139
        LOG.debug('Helper: connection established')
140
        self._load_settings()
141
142

    def closed(self, *args):
143
144
145
146
147
148
149
150
151
        LOG.debug('Helper: connection closed')

    def send_json(self, msg):
        LOG.debug('send: %s' % msg)
        self.send(json.dumps(msg))

    # Protocol handling methods
    def _post(self, r):
        """Handle POST requests"""
152
        LOG.debug('CALLED with %s' % r)
153
        if self.accepted:
154
155
            action = r['path']
            if action == 'shutdown':
156
                self.close()
157
158
159
160
161
                return
            {
                'start': self.start_sync,
                'pause': self.pause_sync
            }[action]()
162
            self.send_json({'OK': 200, 'action': 'post %s' % action})
163
164
        elif r['gui_id'] == self.gui_id:
            self.accepted = True
165
            self.send_json({'ACCEPTED': 202, 'action': 'post gui_id'})
166
        else:
167
168
            action = r.get('path', 'gui_id')
            self.send_json({'REJECTED': 401, 'action': 'post %s' % action})
169
170
171
172
173
            self.terminate()

    def _put(self, r):
        """Handle PUT requests"""
        if not self.accepted:
174
175
            action = r['path']
            self.send_json({'UNAUTHORIZED': 401, 'action': 'put %s' % action})
176
177
178
            self.terminate()
        else:
            LOG.debug('put %s' % r)
179
180
181
182
            action = r.pop('path')
            self.set_settings(r)
            r.update({'CREATED': 201, 'action': 'put %s' % action})
            self.send_json(r)
183
184
185

    def _get(self, r):
        """Handle GET requests"""
186
        action = r.pop('path')
187
        if not self.accepted:
188
            self.send_json({'UNAUTHORIZED': 401, 'action': 'get %s' % action})
189
190
191
192
193
            self.terminate()
        else:
            data = {
                'settings': self.get_settings,
                'status': self.get_status,
194
195
            }[action]()
            data['action'] = 'get %s' % action
196
            self.send_json(data)
197
198

    def received_message(self, message):
199
200
201
202
203
204
205
206
207
        """Route requests to corresponding handling methods"""
        LOG.debug('recv: %s' % message)
        try:
            r = json.loads('%s' % message)
        except ValueError as ve:
            self.send_json({'BAD REQUEST': 400})
            LOG.error('JSON ERROR: %s' % ve)
            return
        try:
208
            method = r.pop('method')
209
210
211
212
            {
                'post': self._post,
                'put': self._put,
                'get': self._get
213
            }[method](r)
214
215
216
217
218
219
220
        except KeyError as ke:
            self.send_json({'BAD REQUEST': 400})
            LOG.error('KEY ERROR: %s' % ke)
        except Exception as e:
            self.send_json({'INTERNAL ERROR': 500})
            LOG.error('EXCEPTION: %s' % e)
            self.terminate()