protocol.py 9.66 KB
Newer Older
1
2
3
4
from ws4py.websocket import WebSocket
import json
import logging
from os.path import abspath
5
from agkyra.syncer import syncer, setup, pithos_client, localfs_client
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from agkyra.config import AgkyraConfig


LOG = logging.getLogger(__name__)


class WebSocketProtocol(WebSocket):
    """Helper-side WebSocket protocol for communication with GUI:

    -- INTERRNAL HANDSAKE --
    GUI: {"method": "post", "gui_id": <GUI ID>}
    HELPER: {"ACCEPTED": 202, "method": "post"}" or
        "{"REJECTED": 401, "action": "post gui_id"}

    -- SHUT DOWN --
    GUI: {"method": "post", "path": "shutdown"}

    -- PAUSE --
    GUI: {"method": "post", "path": "pause"}
    HELPER: {"OK": 200, "action": "post pause"} or error

    -- start --
    GUI: {"method": "post", "path": "start"}
    HELPER: {"OK": 200, "action": "post start"} or error

    -- GET SETTINGS --
    GUI: {"method": "get", "path": "settings"}
    HELPER:
        {
            "action": "get settings",
            "token": <user token>,
            "url": <auth url>,
            "container": <container>,
            "directory": <local directory>,
            "exclude": <file path>
        } or {<ERROR>: <ERROR CODE>}

    -- PUT SETTINGS --
    GUI: {
            "method": "put", "path": "settings",
            "token": <user token>,
            "url": <auth url>,
            "container": <container>,
            "directory": <local directory>,
            "exclude": <file path>
        }
    HELPER: {"CREATED": 201, "action": "put settings",} or
        {<ERROR>: <ERROR CODE>, "action": "get settings",}

    -- GET STATUS --
    GUI: {"method": "get", "path": "status"}
57
58
59
60
61
    HELPER: {
        "can_sync": <boolean>,
        "progress": <int>,
        "paused": <boolean>,
        "action": "get status"} or
62
63
64
65
66
67
68
69
        {<ERROR>: <ERROR CODE>, "action": "get status"}
    """

    gui_id = None
    accepted = False
    settings = dict(
        token=None, url=None,
        container=None, directory=None,
70
71
        exclude=None)
    status = dict(progress=0, paused=True, can_sync=False)
72
73
    file_syncer = None
    cnf = AgkyraConfig()
74
    essentials = ('url', 'token', 'container', 'directory')
75

76
77
78
79
    def _get_default_sync(self):
        """Get global.default_sync or pick the first sync as default
        If there are no syncs, create a 'default' sync.
        """
80
        sync = self.cnf.get('global', 'default_sync')
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
        if not sync:
            for sync in self.cnf.keys('sync'):
                break
            self.cnf.set('global', 'default_sync', sync or 'default')
        return sync or 'default'

    def _get_sync_cloud(self, sync):
        """Get the <sync>.cloud or pick the first cloud and use it
        In case of cloud picking, set the cloud as the <sync>.cloud for future
        sessions.
        If no clouds are found, create a 'default' cloud, with an empty url.
        """
        try:
            cloud = self.cnf.get_sync(sync, 'cloud')
        except KeyError:
            cloud = None
        if not cloud:
            for cloud in self.cnf.keys('cloud'):
                break
            self.cnf.set_sync(sync, 'cloud', cloud or 'default')
        return cloud or 'default'
102

103
104
105
106
    def _load_settings(self):
        LOG.debug('Start loading settings')
        sync = self._get_default_sync()
        cloud = self._get_sync_cloud(sync)
107
108

        try:
109
110
111
112
113
114
115
            self.settings['url'] = self.cnf.get_cloud(cloud, 'url')
        except Exception:
            self.settings['url'] = None
        try:
            self.settings['token'] = self.cnf.get_cloud(cloud, 'token')
        except Exception:
            self.settings['url'] = None
116
117

        for option in ('container', 'directory', 'exclude'):
118
119
120
121
            try:
                self.settings[option] = self.cnf.get_sync(sync, option)
            except KeyError:
                LOG.debug('No %s is set' % option)
122

123
124
        LOG.debug('Finished loading settings')

125
    def _dump_settings(self):
126
        LOG.debug('Saving settings')
127
128
129
130
131
132
133
134
135
136
137
        if not self.settings.get('url', None):
            LOG.debug('No settings to save')
            return

        sync = self._get_default_sync()
        cloud = self._get_sync_cloud(sync)

        try:
            old_url = self.cnf.get_cloud(cloud, 'url') or ''
        except KeyError:
            old_url = self.settings['url']
138
139
140
141
142
143
144
145
146

        while old_url != self.settings['url']:
            cloud = '%s_%s' % (cloud, sync)
            try:
                self.cnf.get_cloud(cloud, 'url')
            except KeyError:
                break

        self.cnf.set_cloud(cloud, 'url', self.settings['url'])
147
        self.cnf.set_cloud(cloud, 'token', self.settings['token'] or '')
148
149
150
        self.cnf.set_sync(sync, 'cloud', cloud)

        for option in ('directory', 'container', 'exclude'):
151
            self.cnf.set_sync(sync, option, self.settings[option] or '')
152

153
154
155
        self.cnf.write()
        LOG.debug('Settings saved')

156
157
    def can_sync(self):
        """Check if settings are enough to setup a syncing proccess"""
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
        return all([self.settings[e] for e in self.essentials])

    def _essentials_changed(self, new_settings):
        """Check if essential settings have changed in new_settings"""
        return all([
            self.settings[e] == self.settings[e] for e in self.essentials])

    def init_sync(self):
        """Initialize syncer"""
        sync = self._get_default_sync()
        syncer_settings = setup.SyncerSettings(
            sync,
            self.settings['url'], self.settings['token'],
            self.settings['container'], self.settings['directory'],
            ignore_ssl=True)
        master = pithos_client.PithosFileClient(syncer_settings)
        slave = localfs_client.LocalfsFileClient(syncer_settings)
        self.syncer = syncer.FileSyncer(syncer_settings, master, slave)
        self.syncer_settings = syncer_settings
        self.syncer.probe_and_sync_all()
178
        self.syncer.launch_daemons()
179

180
181
    # Syncer-related methods
    def get_status(self):
182
183
        if (self.can_sync()):
            LOG.debug('::::::::: %s' % self.syncer.get_next_message())
184
185
        self.status['paused'] = self.syncer.paused
        self.status['progress'] = 50
186
        self.status['can_sync'] = self.can_sync()
187
188
189
190
191
192
        return self.status

    def get_settings(self):
        return self.settings

    def set_settings(self, new_settings):
193
194
195
196
197
198
199
200
        # Prepare setting save
        could_sync = self.can_sync()
        was_active = not self.syncer.paused
        if could_sync and was_active:
            self.pause_sync()
        must_reset_syncing = self._essentials_changed(new_settings)

        # save settings
201
202
203
        self.settings = new_settings
        self._dump_settings()

204
205
206
207
208
209
210
        # Restart
        if self.can_sync():
            if must_reset_syncing or not could_sync:
                self.init_sync()
            elif was_active:
                self.start_sync()

211
    def pause_sync(self):
212
        self.syncer.pause_decide()
213
214

    def start_sync(self):
215
        self.syncer.start_decide()
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241

    # WebSocket connection methods
    def opened(self):
        LOG.debug('Helper: connection established')

    def closed(self, *args):
        LOG.debug('Helper: connection closed')

    def send_json(self, msg):
        LOG.debug('send: %s' % msg)
        self.send(json.dumps(msg))

    # Protocol handling methods
    def _post(self, r):
        """Handle POST requests"""
        if self.accepted:
            action = r['path']
            if action == 'shutdown':
                self.close()
                return
            {
                'start': self.start_sync,
                'pause': self.pause_sync
            }[action]()
            self.send_json({'OK': 200, 'action': 'post %s' % action})
        elif r['gui_id'] == self.gui_id:
242
            self._load_settings()
243
244
245
            if self.can_sync():
                self.init_sync()
                self.pause_sync()
246
247
248
249
250
251
252
253
254
            self.accepted = True
            self.send_json({'ACCEPTED': 202, 'action': 'post gui_id'})
        else:
            action = r.get('path', 'gui_id')
            self.send_json({'REJECTED': 401, 'action': 'post %s' % action})
            self.terminate()

    def _put(self, r):
        """Handle PUT requests"""
255
        if self.accepted:
256
257
258
259
260
            LOG.debug('put %s' % r)
            action = r.pop('path')
            self.set_settings(r)
            r.update({'CREATED': 201, 'action': 'put %s' % action})
            self.send_json(r)
261
262
263
264
        else:
            action = r['path']
            self.send_json({'UNAUTHORIZED': 401, 'action': 'put %s' % action})
            self.terminate()
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302

    def _get(self, r):
        """Handle GET requests"""
        action = r.pop('path')
        if not self.accepted:
            self.send_json({'UNAUTHORIZED': 401, 'action': 'get %s' % action})
            self.terminate()
        else:
            data = {
                'settings': self.get_settings,
                'status': self.get_status,
            }[action]()
            data['action'] = 'get %s' % action
            self.send_json(data)

    def received_message(self, message):
        """Route requests to corresponding handling methods"""
        LOG.debug('recv: %s' % message)
        try:
            r = json.loads('%s' % message)
        except ValueError as ve:
            self.send_json({'BAD REQUEST': 400})
            LOG.error('JSON ERROR: %s' % ve)
            return
        try:
            method = r.pop('method')
            {
                'post': self._post,
                'put': self._put,
                'get': self._get
            }[method](r)
        except KeyError as ke:
            self.send_json({'BAD REQUEST': 400})
            LOG.error('KEY ERROR: %s' % ke)
        except Exception as e:
            self.send_json({'INTERNAL ERROR': 500})
            LOG.error('EXCEPTION: %s' % e)
            self.terminate()