protocol.py 9.53 KB
Newer Older
1
2
3
4
from ws4py.websocket import WebSocket
import json
import logging
from os.path import abspath
5
from agkyra.syncer import syncer, setup, pithos_client, localfs_client
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from agkyra.config import AgkyraConfig


LOG = logging.getLogger(__name__)


class WebSocketProtocol(WebSocket):
    """Helper-side WebSocket protocol for communication with GUI:

    -- INTERRNAL HANDSAKE --
    GUI: {"method": "post", "gui_id": <GUI ID>}
    HELPER: {"ACCEPTED": 202, "method": "post"}" or
        "{"REJECTED": 401, "action": "post gui_id"}

    -- SHUT DOWN --
    GUI: {"method": "post", "path": "shutdown"}

    -- PAUSE --
    GUI: {"method": "post", "path": "pause"}
    HELPER: {"OK": 200, "action": "post pause"} or error

    -- start --
    GUI: {"method": "post", "path": "start"}
    HELPER: {"OK": 200, "action": "post start"} or error

    -- GET SETTINGS --
    GUI: {"method": "get", "path": "settings"}
    HELPER:
        {
            "action": "get settings",
            "token": <user token>,
            "url": <auth url>,
            "container": <container>,
            "directory": <local directory>,
            "exclude": <file path>
        } or {<ERROR>: <ERROR CODE>}

    -- PUT SETTINGS --
    GUI: {
            "method": "put", "path": "settings",
            "token": <user token>,
            "url": <auth url>,
            "container": <container>,
            "directory": <local directory>,
            "exclude": <file path>
        }
    HELPER: {"CREATED": 201, "action": "put settings",} or
        {<ERROR>: <ERROR CODE>, "action": "get settings",}

    -- GET STATUS --
    GUI: {"method": "get", "path": "status"}
57
58
59
60
61
    HELPER: {
        "can_sync": <boolean>,
        "progress": <int>,
        "paused": <boolean>,
        "action": "get status"} or
62
63
64
65
66
67
68
69
        {<ERROR>: <ERROR CODE>, "action": "get status"}
    """

    gui_id = None
    accepted = False
    settings = dict(
        token=None, url=None,
        container=None, directory=None,
70
71
        exclude=None)
    status = dict(progress=0, paused=True, can_sync=False)
72
73
    file_syncer = None
    cnf = AgkyraConfig()
74
    essentials = ('url', 'token', 'container', 'directory')
75

76
77
78
79
    def _get_default_sync(self):
        """Get global.default_sync or pick the first sync as default
        If there are no syncs, create a 'default' sync.
        """
80
        sync = self.cnf.get('global', 'default_sync')
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
        if not sync:
            for sync in self.cnf.keys('sync'):
                break
            self.cnf.set('global', 'default_sync', sync or 'default')
        return sync or 'default'

    def _get_sync_cloud(self, sync):
        """Get the <sync>.cloud or pick the first cloud and use it
        In case of cloud picking, set the cloud as the <sync>.cloud for future
        sessions.
        If no clouds are found, create a 'default' cloud, with an empty url.
        """
        try:
            cloud = self.cnf.get_sync(sync, 'cloud')
        except KeyError:
            cloud = None
        if not cloud:
            for cloud in self.cnf.keys('cloud'):
                break
            self.cnf.set_sync(sync, 'cloud', cloud or 'default')
        return cloud or 'default'
102

103
104
105
106
    def _load_settings(self):
        LOG.debug('Start loading settings')
        sync = self._get_default_sync()
        cloud = self._get_sync_cloud(sync)
107
108

        try:
109
110
111
112
113
114
115
            self.settings['url'] = self.cnf.get_cloud(cloud, 'url')
        except Exception:
            self.settings['url'] = None
        try:
            self.settings['token'] = self.cnf.get_cloud(cloud, 'token')
        except Exception:
            self.settings['url'] = None
116
117

        for option in ('container', 'directory', 'exclude'):
118
119
120
121
            try:
                self.settings[option] = self.cnf.get_sync(sync, option)
            except KeyError:
                LOG.debug('No %s is set' % option)
122

123
124
        LOG.debug('Finished loading settings')

125
    def _dump_settings(self):
126
        LOG.debug('Saving settings')
127
128
129
130
131
132
133
134
135
136
137
        if not self.settings.get('url', None):
            LOG.debug('No settings to save')
            return

        sync = self._get_default_sync()
        cloud = self._get_sync_cloud(sync)

        try:
            old_url = self.cnf.get_cloud(cloud, 'url') or ''
        except KeyError:
            old_url = self.settings['url']
138
139
140
141
142
143
144
145
146

        while old_url != self.settings['url']:
            cloud = '%s_%s' % (cloud, sync)
            try:
                self.cnf.get_cloud(cloud, 'url')
            except KeyError:
                break

        self.cnf.set_cloud(cloud, 'url', self.settings['url'])
147
        self.cnf.set_cloud(cloud, 'token', self.settings['token'] or '')
148
149
150
        self.cnf.set_sync(sync, 'cloud', cloud)

        for option in ('directory', 'container', 'exclude'):
151
            self.cnf.set_sync(sync, option, self.settings[option] or '')
152

153
154
155
        self.cnf.write()
        LOG.debug('Settings saved')

156
157
    def can_sync(self):
        """Check if settings are enough to setup a syncing proccess"""
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
        return all([self.settings[e] for e in self.essentials])

    def _essentials_changed(self, new_settings):
        """Check if essential settings have changed in new_settings"""
        return all([
            self.settings[e] == self.settings[e] for e in self.essentials])

    def init_sync(self):
        """Initialize syncer"""
        sync = self._get_default_sync()
        syncer_settings = setup.SyncerSettings(
            sync,
            self.settings['url'], self.settings['token'],
            self.settings['container'], self.settings['directory'],
            ignore_ssl=True)
        master = pithos_client.PithosFileClient(syncer_settings)
        slave = localfs_client.LocalfsFileClient(syncer_settings)
        self.syncer = syncer.FileSyncer(syncer_settings, master, slave)
        self.syncer_settings = syncer_settings
        self.syncer.probe_and_sync_all()
178

179
180
    # Syncer-related methods
    def get_status(self):
181
182
        self.status['paused'] = self.syncer.paused
        self.status['progress'] = 50
183
        self.status['can_sync'] = self.can_sync()
184
185
186
187
188
189
        return self.status

    def get_settings(self):
        return self.settings

    def set_settings(self, new_settings):
190
191
192
193
194
195
196
197
        # Prepare setting save
        could_sync = self.can_sync()
        was_active = not self.syncer.paused
        if could_sync and was_active:
            self.pause_sync()
        must_reset_syncing = self._essentials_changed(new_settings)

        # save settings
198
199
200
        self.settings = new_settings
        self._dump_settings()

201
202
203
204
205
206
207
        # Restart
        if self.can_sync():
            if must_reset_syncing or not could_sync:
                self.init_sync()
            elif was_active:
                self.start_sync()

208
    def pause_sync(self):
209
        self.syncer.pause_decide()
210
211

    def start_sync(self):
212
        self.syncer.start_decide()
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238

    # WebSocket connection methods
    def opened(self):
        LOG.debug('Helper: connection established')

    def closed(self, *args):
        LOG.debug('Helper: connection closed')

    def send_json(self, msg):
        LOG.debug('send: %s' % msg)
        self.send(json.dumps(msg))

    # Protocol handling methods
    def _post(self, r):
        """Handle POST requests"""
        if self.accepted:
            action = r['path']
            if action == 'shutdown':
                self.close()
                return
            {
                'start': self.start_sync,
                'pause': self.pause_sync
            }[action]()
            self.send_json({'OK': 200, 'action': 'post %s' % action})
        elif r['gui_id'] == self.gui_id:
239
            self._load_settings()
240
241
242
            if self.can_sync():
                self.init_sync()
                self.pause_sync()
243
244
245
246
247
248
249
250
251
            self.accepted = True
            self.send_json({'ACCEPTED': 202, 'action': 'post gui_id'})
        else:
            action = r.get('path', 'gui_id')
            self.send_json({'REJECTED': 401, 'action': 'post %s' % action})
            self.terminate()

    def _put(self, r):
        """Handle PUT requests"""
252
        if self.accepted:
253
254
255
256
257
            LOG.debug('put %s' % r)
            action = r.pop('path')
            self.set_settings(r)
            r.update({'CREATED': 201, 'action': 'put %s' % action})
            self.send_json(r)
258
259
260
261
        else:
            action = r['path']
            self.send_json({'UNAUTHORIZED': 401, 'action': 'put %s' % action})
            self.terminate()
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299

    def _get(self, r):
        """Handle GET requests"""
        action = r.pop('path')
        if not self.accepted:
            self.send_json({'UNAUTHORIZED': 401, 'action': 'get %s' % action})
            self.terminate()
        else:
            data = {
                'settings': self.get_settings,
                'status': self.get_status,
            }[action]()
            data['action'] = 'get %s' % action
            self.send_json(data)

    def received_message(self, message):
        """Route requests to corresponding handling methods"""
        LOG.debug('recv: %s' % message)
        try:
            r = json.loads('%s' % message)
        except ValueError as ve:
            self.send_json({'BAD REQUEST': 400})
            LOG.error('JSON ERROR: %s' % ve)
            return
        try:
            method = r.pop('method')
            {
                'post': self._post,
                'put': self._put,
                'get': self._get
            }[method](r)
        except KeyError as ke:
            self.send_json({'BAD REQUEST': 400})
            LOG.error('KEY ERROR: %s' % ke)
        except Exception as e:
            self.send_json({'INTERNAL ERROR': 500})
            LOG.error('EXCEPTION: %s' % e)
            self.terminate()