protocol.py 6.96 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
from ws4py.websocket import WebSocket
import json
import logging
from os.path import abspath
from agkyra.syncer import syncer
from agkyra.config import AgkyraConfig
from kamaki.clients.astakos import AstakosClient
from kamaki.clients.pithos import PithosClient


LOG = logging.getLogger(__name__)


class WebSocketProtocol(WebSocket):
    """Helper-side WebSocket protocol for communication with GUI:

    -- INTERRNAL HANDSAKE --
    GUI: {"method": "post", "gui_id": <GUI ID>}
    HELPER: {"ACCEPTED": 202, "method": "post"}" or
        "{"REJECTED": 401, "action": "post gui_id"}

    -- SHUT DOWN --
    GUI: {"method": "post", "path": "shutdown"}

    -- PAUSE --
    GUI: {"method": "post", "path": "pause"}
    HELPER: {"OK": 200, "action": "post pause"} or error

    -- start --
    GUI: {"method": "post", "path": "start"}
    HELPER: {"OK": 200, "action": "post start"} or error

    -- GET SETTINGS --
    GUI: {"method": "get", "path": "settings"}
    HELPER:
        {
            "action": "get settings",
            "token": <user token>,
            "url": <auth url>,
            "container": <container>,
            "directory": <local directory>,
            "exclude": <file path>
        } or {<ERROR>: <ERROR CODE>}

    -- PUT SETTINGS --
    GUI: {
            "method": "put", "path": "settings",
            "token": <user token>,
            "url": <auth url>,
            "container": <container>,
            "directory": <local directory>,
            "pithos_url": <pithos URL>,
            "exclude": <file path>
        }
    HELPER: {"CREATED": 201, "action": "put settings",} or
        {<ERROR>: <ERROR CODE>, "action": "get settings",}

    -- GET STATUS --
    GUI: {"method": "get", "path": "status"}
    HELPER: {"progress": <int>, "paused": <boolean>, "action": "get status"} or
        {<ERROR>: <ERROR CODE>, "action": "get status"}
    """

    gui_id = None
    accepted = False
    settings = dict(
        token=None, url=None,
        container=None, directory=None,
        exclude=None, pithos_ui=None)
    status = dict(progress=0, paused=True)
    file_syncer = None
    cnf = AgkyraConfig()

    def _load_settings(self):
        sync = self.cnf.get('global', 'default_sync')
        cloud = self.cnf.get_sync(sync, 'cloud')

        url = self.cnf.get_cloud(cloud, 'url')
        token = self.cnf.get_cloud(cloud, 'token')

        astakos = AstakosClient(url, token)
        self.settings['url'], self.settings['token'] = url, token

        try:
            endpoints = astakos.get_endpoints()['access']['serviceCatalog']
            for endpoint in endpoints:
                if endpoint['type'] == PithosClient.service_type:
                    pithos_ui = endpoint['endpoints'][0]['SNF:uiURL']
                    self.settings['pithos_ui'] = pithos_ui
                    break
        except Exception as e:
            LOG.debug('Failed to retrieve pithos_ui: %s' % e)

        for option in ('container', 'directory', 'exclude'):
            self.settings[option] = self.cnf.get_sync(sync, option)

    def _dump_settings(self):
        sync = self.cnf.get('global', 'default_sync')
        cloud = self.cnf.get_sync(sync, 'cloud')

        old_url = self.cnf.get_cloud(cloud, 'url')
        while old_url != self.settings['url']:
            cloud = '%s_%s' % (cloud, sync)
            try:
                self.cnf.get_cloud(cloud, 'url')
            except KeyError:
                break

        self.cnf.set_cloud(cloud, 'url', self.settings['url'])
        self.cnf.set_cloud(cloud, 'token', self.settings['token'])
        self.cnf.set_sync(sync, 'cloud', cloud)

        for option in ('directory', 'container', 'exclude'):
            self.cnf.set_sync(sync, option, self.settings[option])

    # Syncer-related methods
    def get_status(self):
        from random import randint
        if self.status['progress'] < 100:
            self.status['progress'] += 0 if randint(0, 2) else 1
        return self.status

    def get_settings(self):
        self._load_settings()
        return self.settings

    def set_settings(self, new_settings):
        self.settings = new_settings
        self._dump_settings()

    def pause_sync(self):
        self.status['paused'] = True

    def start_sync(self):
        self.status['paused'] = False

    # WebSocket connection methods
    def opened(self):
        LOG.debug('Helper: connection established')
        self._load_settings()

    def closed(self, *args):
        LOG.debug('Helper: connection closed')

    def send_json(self, msg):
        LOG.debug('send: %s' % msg)
        self.send(json.dumps(msg))

    # Protocol handling methods
    def _post(self, r):
        """Handle POST requests"""
        LOG.debug('CALLED with %s' % r)
        if self.accepted:
            action = r['path']
            if action == 'shutdown':
                self.close()
                return
            {
                'start': self.start_sync,
                'pause': self.pause_sync
            }[action]()
            self.send_json({'OK': 200, 'action': 'post %s' % action})
        elif r['gui_id'] == self.gui_id:
            self.accepted = True
            self.send_json({'ACCEPTED': 202, 'action': 'post gui_id'})
        else:
            action = r.get('path', 'gui_id')
            self.send_json({'REJECTED': 401, 'action': 'post %s' % action})
            self.terminate()

    def _put(self, r):
        """Handle PUT requests"""
        if not self.accepted:
            action = r['path']
            self.send_json({'UNAUTHORIZED': 401, 'action': 'put %s' % action})
            self.terminate()
        else:
            LOG.debug('put %s' % r)
            action = r.pop('path')
            self.set_settings(r)
            r.update({'CREATED': 201, 'action': 'put %s' % action})
            self.send_json(r)

    def _get(self, r):
        """Handle GET requests"""
        action = r.pop('path')
        if not self.accepted:
            self.send_json({'UNAUTHORIZED': 401, 'action': 'get %s' % action})
            self.terminate()
        else:
            data = {
                'settings': self.get_settings,
                'status': self.get_status,
            }[action]()
            data['action'] = 'get %s' % action
            self.send_json(data)

    def received_message(self, message):
        """Route requests to corresponding handling methods"""
        LOG.debug('recv: %s' % message)
        try:
            r = json.loads('%s' % message)
        except ValueError as ve:
            self.send_json({'BAD REQUEST': 400})
            LOG.error('JSON ERROR: %s' % ve)
            return
        try:
            method = r.pop('method')
            {
                'post': self._post,
                'put': self._put,
                'get': self._get
            }[method](r)
        except KeyError as ke:
            self.send_json({'BAD REQUEST': 400})
            LOG.error('KEY ERROR: %s' % ke)
        except Exception as e:
            self.send_json({'INTERNAL ERROR': 500})
            LOG.error('EXCEPTION: %s' % e)
            self.terminate()