node.py 39.7 KB
Newer Older
Antony Chazapis's avatar
Antony Chazapis committed
1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
4
5
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
6
#
7
8
9
#   1. Redistributions of source code must retain the above
#      copyright notice, this list of conditions and the following
#      disclaimer.
10
#
11
12
13
14
#   2. Redistributions in binary form must reproduce the above
#      copyright notice, this list of conditions and the following
#      disclaimer in the documentation and/or other materials
#      provided with the distribution.
15
#
16
17
18
19
20
21
22
23
24
25
26
27
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
30
31
32
33
34
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.

from time import time
35
36
from operator import itemgetter
from itertools import groupby
37
38
39

from dbworker import DBWorker

Antony Chazapis's avatar
Antony Chazapis committed
40
from pithos.backends.filter import parse_filters
41

42

43
ROOTNODE = 0
44

45
46
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
 CLUSTER) = range(11)
47

48
(MATCH_PREFIX, MATCH_EXACT) = range(2)
49
50
51
52
53

inf = float('inf')


def strnextling(prefix):
54
    """Return the first unicode string
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
       greater than but not starting with given prefix.
       strnextling('hello') -> 'hellp'
    """
    if not prefix:
        ## all strings start with the null string,
        ## therefore we have to approximate strnextling('')
        ## with the last unicode character supported by python
        ## 0x10ffff for wide (32-bit unicode) python builds
        ## 0x00ffff for narrow (16-bit unicode) python builds
        ## We will not autodetect. 0xffff is safe enough.
        return unichr(0xffff)
    s = prefix[:-1]
    c = ord(prefix[-1])
    if c >= 0xffff:
        raise RuntimeError
70
    s += unichr(c + 1)
71
72
    return s

73

74
def strprevling(prefix):
75
    """Return an approximation of the last unicode string
76
77
78
79
80
81
82
83
84
       less than but not starting with given prefix.
       strprevling(u'hello') -> u'helln\\xffff'
    """
    if not prefix:
        ## There is no prevling for the null string
        return prefix
    s = prefix[:-1]
    c = ord(prefix[-1])
    if c > 0:
85
        s += unichr(c - 1) + unichr(0xffff)
86
87
88
89
    return s


_propnames = {
90
91
92
93
94
95
96
97
98
99
100
    'serial': 0,
    'node': 1,
    'hash': 2,
    'size': 3,
    'type': 4,
    'source': 5,
    'mtime': 6,
    'muser': 7,
    'uuid': 8,
    'checksum': 9,
    'cluster': 10
101
102
103
104
}


class Node(DBWorker):
105
106
    """Nodes store path organization and have multiple versions.
       Versions store object history and have multiple attributes.
107
108
       Attributes store metadata.
    """
109

110
    # TODO: Provide an interface for included and excluded clusters.
111

112
    def __init__(self, **params):
Antony Chazapis's avatar
Antony Chazapis committed
113
        DBWorker.__init__(self, **params)
114
        execute = self.execute
115

116
        execute(""" pragma foreign_keys = on """)
117

118
119
        execute(""" create table if not exists nodes
                          ( node       integer primary key,
Antony Chazapis's avatar
Antony Chazapis committed
120
                            parent     integer default 0,
Antony Chazapis's avatar
Antony Chazapis committed
121
                            path       text    not null default '',
122
                            latest_version     integer,
123
124
125
                            foreign key (parent)
                            references nodes(node)
                            on update cascade
126
                            on delete cascade ) """)
127
128
        execute(""" create unique index if not exists idx_nodes_path
                    on nodes(path) """)
129
130
        execute(""" create index if not exists idx_nodes_parent
                    on nodes(parent) """)
131
132
        execute(""" create index if not exists idx_latest_version
                    on nodes(latest_version) """)
133

134
135
136
137
138
139
140
141
142
        execute(""" create table if not exists policy
                          ( node   integer,
                            key    text,
                            value  text,
                            primary key (node, key)
                            foreign key (node)
                            references nodes(node)
                            on update cascade
                            on delete cascade ) """)
143

144
        execute(""" create table if not exists statistics
Antony Chazapis's avatar
Antony Chazapis committed
145
                          ( node       integer,
146
147
148
149
150
151
152
153
                            population integer not null default 0,
                            size       integer not null default 0,
                            mtime      integer,
                            cluster    integer not null default 0,
                            primary key (node, cluster)
                            foreign key (node)
                            references nodes(node)
                            on update cascade
154
                            on delete cascade ) """)
155

156
157
        execute(""" create table if not exists versions
                          ( serial     integer primary key,
Antony Chazapis's avatar
Antony Chazapis committed
158
                            node       integer,
159
                            hash       text,
160
                            size       integer not null default 0,
Antony Chazapis's avatar
Antony Chazapis committed
161
                            type       text    not null default '',
162
163
                            source     integer,
                            mtime      integer,
164
                            muser      text    not null default '',
165
                            uuid       text    not null default '',
166
                            checksum   text    not null default '',
167
168
169
170
171
                            cluster    integer not null default 0,
                            foreign key (node)
                            references nodes(node)
                            on update cascade
                            on delete cascade ) """)
Antony Chazapis's avatar
Antony Chazapis committed
172
173
        execute(""" create index if not exists idx_versions_node_mtime
                    on versions(node, mtime) """)
174
175
        execute(""" create index if not exists idx_versions_node_uuid
                    on versions(uuid) """)
176

177
        execute(""" create table if not exists attributes
178
179
180
181
182
183
                          ( serial      integer,
                            domain      text,
                            key         text,
                            value       text,
                            node        integer not null    default 0,
                            is_latest   boolean not null    default 1,
184
                            primary key (serial, domain, key)
185
186
187
188
                            foreign key (serial)
                            references versions(serial)
                            on update cascade
                            on delete cascade ) """)
189
190
        execute(""" create index if not exists idx_attributes_domain
                    on attributes(domain) """)
191
192
        execute(""" create index if not exists idx_attributes_serial_node
                    on attributes(serial, node) """)
193

194
195
196
197
198
199
200
        wrapper = self.wrapper
        wrapper.execute()
        try:
            q = "insert or ignore into nodes(node, parent) values (?, ?)"
            execute(q, (ROOTNODE, ROOTNODE))
        finally:
            wrapper.commit()
201

Antony Chazapis's avatar
Antony Chazapis committed
202
203
204
205
    def node_create(self, parent, path):
        """Create a new node from the given properties.
           Return the node identifier of the new node.
        """
206

Antony Chazapis's avatar
Antony Chazapis committed
207
208
209
210
        q = ("insert into nodes (parent, path) "
             "values (?, ?)")
        props = (parent, path)
        return self.execute(q, props).lastrowid
211

212
    def node_lookup(self, path, for_update=False):
Antony Chazapis's avatar
Antony Chazapis committed
213
214
        """Lookup the current node of the given path.
           Return None if the path is not found.
215
216

           kwargs is not used: it is passed for conformance
Antony Chazapis's avatar
Antony Chazapis committed
217
        """
218

219
        q = "select node from nodes where path = ?"
Antony Chazapis's avatar
Antony Chazapis committed
220
221
222
223
224
        self.execute(q, (path,))
        r = self.fetchone()
        if r is not None:
            return r[0]
        return None
225

Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
226
    def node_lookup_bulk(self, paths):
227
        """Lookup the current nodes for the given paths.
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
228
229
           Return () if the path is not found.
        """
230

Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
231
        placeholders = ','.join('?' for path in paths)
232
        q = "select node from nodes where path in (%s)" % placeholders
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
233
        self.execute(q, paths)
234
235
        r = self.fetchall()
        if r is not None:
236
            return [row[0] for row in r]
237
        return None
238

Antony Chazapis's avatar
Antony Chazapis committed
239
240
241
242
    def node_get_properties(self, node):
        """Return the node's (parent, path).
           Return None if the node is not found.
        """
243

Antony Chazapis's avatar
Antony Chazapis committed
244
        q = "select parent, path from nodes where node = ?"
245
        self.execute(q, (node,))
Antony Chazapis's avatar
Antony Chazapis committed
246
        return self.fetchone()
247

Antony Chazapis's avatar
Antony Chazapis committed
248
249
250
    def node_get_versions(self, node, keys=(), propnames=_propnames):
        """Return the properties of all versions at node.
           If keys is empty, return all properties in the order
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
251
252
           (serial, node, hash, size, type, source, mtime, muser, uuid,
            checksum, cluster).
Antony Chazapis's avatar
Antony Chazapis committed
253
        """
254

Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
255
256
        q = ("select serial, node, hash, size, type, source, mtime, muser, "
             "uuid, checksum, cluster "
Antony Chazapis's avatar
Antony Chazapis committed
257
258
             "from versions "
             "where node = ?")
259
        self.execute(q, (node,))
Antony Chazapis's avatar
Antony Chazapis committed
260
261
262
        r = self.fetchall()
        if r is None:
            return r
263

Antony Chazapis's avatar
Antony Chazapis committed
264
265
266
        if not keys:
            return r
        return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
267

Antony Chazapis's avatar
Antony Chazapis committed
268
    def node_count_children(self, node):
Antony Chazapis's avatar
Antony Chazapis committed
269
        """Return node's child count."""
270

Antony Chazapis's avatar
Antony Chazapis committed
271
        q = "select count(node) from nodes where parent = ? and node != 0"
Antony Chazapis's avatar
Antony Chazapis committed
272
        self.execute(q, (node,))
Antony Chazapis's avatar
Antony Chazapis committed
273
        r = self.fetchone()
Antony Chazapis's avatar
Antony Chazapis committed
274
275
        if r is None:
            return 0
Antony Chazapis's avatar
Antony Chazapis committed
276
        return r[0]
277

278
279
    def node_purge_children(self, parent, before=inf, cluster=0,
                            update_statistics_ancestors_depth=None):
Antony Chazapis's avatar
Antony Chazapis committed
280
281
        """Delete all versions with the specified
           parent and cluster, and return
282
           the hashes, the size and the serials of versions deleted.
Antony Chazapis's avatar
Antony Chazapis committed
283
284
           Clears out nodes with no remaining versions.
        """
285

Antony Chazapis's avatar
Antony Chazapis committed
286
287
288
        execute = self.execute
        q = ("select count(serial), sum(size) from versions "
             "where node in (select node "
289
290
             "from nodes "
             "where parent = ?) "
Antony Chazapis's avatar
Antony Chazapis committed
291
292
293
294
295
296
             "and cluster = ? "
             "and mtime <= ?")
        args = (parent, cluster, before)
        execute(q, args)
        nr, size = self.fetchone()
        if not nr:
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
297
            return (), 0, ()
298
299
        mtime = time()
        self.statistics_update(parent, -nr, -size, mtime, cluster)
300
301
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster,
                                         update_statistics_ancestors_depth)
302

303
        q = ("select hash, serial from versions "
Antony Chazapis's avatar
Antony Chazapis committed
304
             "where node in (select node "
305
306
             "from nodes "
             "where parent = ?) "
Antony Chazapis's avatar
Antony Chazapis committed
307
308
309
             "and cluster = ? "
             "and mtime <= ?")
        execute(q, args)
310
311
312
313
314
        hashes = []
        serials = []
        for r in self.fetchall():
            hashes += [r[0]]
            serials += [r[1]]
315

Antony Chazapis's avatar
Antony Chazapis committed
316
317
        q = ("delete from versions "
             "where node in (select node "
318
319
             "from nodes "
             "where parent = ?) "
Antony Chazapis's avatar
Antony Chazapis committed
320
321
322
             "and cluster = ? "
             "and mtime <= ?")
        execute(q, args)
Antony Chazapis's avatar
Antony Chazapis committed
323
324
        q = ("delete from nodes "
             "where node in (select node from nodes n "
325
326
327
328
             "where (select count(serial) "
             "from versions "
             "where node = n.node) = 0 "
             "and parent = ?)")
Antony Chazapis's avatar
Antony Chazapis committed
329
        execute(q, (parent,))
330
        return hashes, size, serials
331

332
333
    def node_purge(self, node, before=inf, cluster=0,
                   update_statistics_ancestors_depth=None):
Antony Chazapis's avatar
Antony Chazapis committed
334
335
        """Delete all versions with the specified
           node and cluster, and return
336
           the hashes, the size and the serials of versions deleted.
Antony Chazapis's avatar
Antony Chazapis committed
337
338
           Clears out the node if it has no remaining versions.
        """
339

Antony Chazapis's avatar
Antony Chazapis committed
340
341
342
343
344
345
346
347
348
        execute = self.execute
        q = ("select count(serial), sum(size) from versions "
             "where node = ? "
             "and cluster = ? "
             "and mtime <= ?")
        args = (node, cluster, before)
        execute(q, args)
        nr, size = self.fetchone()
        if not nr:
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
349
            return (), 0, ()
350
        mtime = time()
351
352
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster,
                                         update_statistics_ancestors_depth)
353

354
        q = ("select hash, serial from versions "
Antony Chazapis's avatar
Antony Chazapis committed
355
356
357
358
             "where node = ? "
             "and cluster = ? "
             "and mtime <= ?")
        execute(q, args)
359
360
361
362
363
        hashes = []
        serials = []
        for r in self.fetchall():
            hashes += [r[0]]
            serials += [r[1]]
364

Antony Chazapis's avatar
Antony Chazapis committed
365
366
367
368
369
        q = ("delete from versions "
             "where node = ? "
             "and cluster = ? "
             "and mtime <= ?")
        execute(q, args)
Antony Chazapis's avatar
Antony Chazapis committed
370
371
        q = ("delete from nodes "
             "where node in (select node from nodes n "
372
373
374
375
             "where (select count(serial) "
             "from versions "
             "where node = n.node) = 0 "
             "and node = ?)")
Antony Chazapis's avatar
Antony Chazapis committed
376
        execute(q, (node,))
377
        return hashes, size, serials
378

379
    def node_remove(self, node, update_statistics_ancestors_depth=None):
Antony Chazapis's avatar
Antony Chazapis committed
380
381
382
        """Remove the node specified.
           Return false if the node has children or is not found.
        """
383

384
        if self.node_count_children(node):
Antony Chazapis's avatar
Antony Chazapis committed
385
            return False
386

Antony Chazapis's avatar
Antony Chazapis committed
387
        mtime = time()
Antony Chazapis's avatar
Antony Chazapis committed
388
        q = ("select count(serial), sum(size), cluster "
389
390
391
             "from versions "
             "where node = ? "
             "group by cluster")
Antony Chazapis's avatar
Antony Chazapis committed
392
393
        self.execute(q, (node,))
        for population, size, cluster in self.fetchall():
394
            self.statistics_update_ancestors(
395
396
                node, -population, -size, mtime, cluster,
                update_statistics_ancestors_depth)
397

Antony Chazapis's avatar
Antony Chazapis committed
398
399
400
        q = "delete from nodes where node = ?"
        self.execute(q, (node,))
        return True
401

402
    def node_accounts(self, accounts=()):
403
        q = ("select path, node from nodes where node != 0 and parent = 0 ")
404
405
406
407
408
409
410
        args = []
        if accounts:
            placeholders = ','.join('?' for a in accounts)
            q += ("and path in (%s)" % placeholders)
            args += accounts
        return self.execute(q, args).fetchall()

411
412
413
    def node_account_quotas(self):
        q = ("select n.path, p.value from nodes n, policy p "
             "where n.node != 0 and n.parent = 0 "
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
414
             "and n.node = p.node and p.key = 'quota'")
415
416
        return dict(self.execute(q).fetchall())

417
418
    def node_account_usage(self, account=None, cluster=0):
        """Return usage for a specific account.
419

420
421
422
423
        Keyword arguments:
        account -- (default None: list usage for all the accounts)
        cluster -- list current, history or deleted usage (default 0: normal)
        """
424

425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
        q = ("select n3.path, sum(v.size) from "
             "versions v, nodes n1, nodes n2, nodes n3 "
             "where v.node = n1.node "
             "and v.cluster = ? "
             "and n1.parent = n2.node "
             "and n2.parent = n3.node "
             "and n3.parent = 0 "
             "and n3.node != 0 ")
        args = [cluster]
        if account:
            q += ("and n3.path = ? ")
            args += [account]
        q += ("group by n3.path")

        print '###', q, args
440
        self.execute(q, args)
441
        return dict(self.fetchall())
442

443
444
445
446
    def policy_get(self, node):
        q = "select key, value from policy where node = ?"
        self.execute(q, (node,))
        return dict(self.fetchall())
447

448
449
450
    def policy_set(self, node, policy):
        q = "insert or replace into policy (node, key, value) values (?, ?, ?)"
        self.executemany(q, ((node, k, v) for k, v in policy.iteritems()))
451

452
453
454
455
    def statistics_get(self, node, cluster=0):
        """Return population, total size and last mtime
           for all versions under node that belong to the cluster.
        """
456

Antony Chazapis's avatar
Antony Chazapis committed
457
        q = ("select population, size, mtime from statistics "
458
459
             "where node = ? and cluster = ?")
        self.execute(q, (node, cluster))
Antony Chazapis's avatar
Antony Chazapis committed
460
        return self.fetchone()
461

462
463
464
465
466
467
    def statistics_update(self, node, population, size, mtime, cluster=0):
        """Update the statistics of the given node.
           Statistics keep track the population, total
           size of objects and mtime in the node's namespace.
           May be zero or positive or negative numbers.
        """
468

Antony Chazapis's avatar
Antony Chazapis committed
469
        qs = ("select population, size from statistics "
470
              "where node = ? and cluster = ?")
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
471
472
        qu = ("insert or replace into statistics "
              "(node, population, size, mtime, cluster) "
473
474
475
476
477
478
479
480
              "values (?, ?, ?, ?, ?)")
        self.execute(qs, (node, cluster))
        r = self.fetchone()
        if r is None:
            prepopulation, presize = (0, 0)
        else:
            prepopulation, presize = r
        population += prepopulation
481
        population = max(population, 0)
482
483
        size += presize
        self.execute(qu, (node, population, size, mtime, cluster))
484

485
486
    def statistics_update_ancestors(self, node, population, size, mtime,
                                    cluster=0, recursion_depth=None):
487
488
489
490
        """Update the statistics of the given node's parent.
           Then recursively update all parents up to the root.
           Population is not recursive.
        """
491

492
        i = 0
493
        while True:
494
495
496
            if node == ROOTNODE:
                break
            if recursion_depth and recursion_depth <= i:
497
                break
Antony Chazapis's avatar
Antony Chazapis committed
498
499
            props = self.node_get_properties(node)
            if props is None:
500
                break
Antony Chazapis's avatar
Antony Chazapis committed
501
502
503
            parent, path = props
            self.statistics_update(parent, population, size, mtime, cluster)
            node = parent
504
            population = 0  # Population isn't recursive
505
            i += 1
506

Antony Chazapis's avatar
Antony Chazapis committed
507
508
509
510
511
    def statistics_latest(self, node, before=inf, except_cluster=0):
        """Return population, total size and last mtime
           for all latest versions under node that
           do not belong to the cluster.
        """
512

Antony Chazapis's avatar
Antony Chazapis committed
513
514
        execute = self.execute
        fetchone = self.fetchone
515

Antony Chazapis's avatar
Antony Chazapis committed
516
517
518
519
520
        # The node.
        props = self.node_get_properties(node)
        if props is None:
            return None
        parent, path = props
521

Antony Chazapis's avatar
Antony Chazapis committed
522
        # The latest version.
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
523
524
        q = ("select serial, node, hash, size, type, source, mtime, muser, "
             "uuid, checksum, cluster "
525
526
             "from versions v "
             "where serial = %s "
Antony Chazapis's avatar
Antony Chazapis committed
527
             "and cluster != ?")
528
529
        subq, args = self._construct_latest_version_subquery(
            node=node, before=before)
530
        execute(q % subq, args + [except_cluster])
Antony Chazapis's avatar
Antony Chazapis committed
531
        props = fetchone()
Antony Chazapis's avatar
Antony Chazapis committed
532
533
534
        if props is None:
            return None
        mtime = props[MTIME]
535

Antony Chazapis's avatar
Antony Chazapis committed
536
537
538
        # First level, just under node (get population).
        q = ("select count(serial), sum(size), max(mtime) "
             "from versions v "
539
             "where serial = %s "
Antony Chazapis's avatar
Antony Chazapis committed
540
541
             "and cluster != ? "
             "and node in (select node "
542
543
544
545
             "from nodes "
             "where parent = ?)")
        subq, args = self._construct_latest_version_subquery(
            node=None, before=before)
546
        execute(q % subq, args + [except_cluster, node])
Antony Chazapis's avatar
Antony Chazapis committed
547
548
549
550
551
552
553
        r = fetchone()
        if r is None:
            return None
        count = r[0]
        mtime = max(mtime, r[2])
        if count == 0:
            return (0, 0, mtime)
554

Antony Chazapis's avatar
Antony Chazapis committed
555
        # All children (get size and mtime).
Antony Chazapis's avatar
Antony Chazapis committed
556
        # This is why the full path is stored.
Antony Chazapis's avatar
Antony Chazapis committed
557
558
        q = ("select count(serial), sum(size), max(mtime) "
             "from versions v "
559
             "where serial = %s "
Antony Chazapis's avatar
Antony Chazapis committed
560
561
             "and cluster != ? "
             "and node in (select node "
562
563
564
565
566
567
             "from nodes "
             "where path like ? escape '\\')")
        subq, args = self._construct_latest_version_subquery(
            node=None, before=before)
        execute(
            q % subq, args + [except_cluster, self.escape_like(path) + '%'])
Antony Chazapis's avatar
Antony Chazapis committed
568
569
570
571
572
573
        r = fetchone()
        if r is None:
            return None
        size = r[1] - props[SIZE]
        mtime = max(mtime, r[2])
        return (count, size, mtime)
574

575
    def nodes_set_latest_version(self, node, serial):
576
        q = ("update nodes set latest_version = ? where node = ?")
577
578
        props = (serial, node)
        self.execute(q, props)
579

580
581
582
    def version_create(self, node, hash, size, type, source, muser, uuid,
                       checksum, cluster=0,
                       update_statistics_ancestors_depth=None):
Antony Chazapis's avatar
Antony Chazapis committed
583
584
585
        """Create a new version from the given properties.
           Return the (serial, mtime) of the new version.
        """
586

Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
587
588
        q = ("insert into versions (node, hash, size, type, source, mtime, "
             "muser, uuid, checksum, cluster) "
589
             "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
Antony Chazapis's avatar
Antony Chazapis committed
590
        mtime = time()
591
592
        props = (node, hash, size, type, source, mtime, muser,
                 uuid, checksum, cluster)
Antony Chazapis's avatar
Antony Chazapis committed
593
        serial = self.execute(q, props).lastrowid
594
595
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
                                         update_statistics_ancestors_depth)
596

597
        self.nodes_set_latest_version(node, serial)
598

Antony Chazapis's avatar
Antony Chazapis committed
599
        return serial, mtime
600

601
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
Antony Chazapis's avatar
Antony Chazapis committed
602
603
        """Lookup the current version of the given node.
           Return a list with its properties:
604
605
           (serial, node, hash, size, type, source, mtime,
            muser, uuid, checksum, cluster)
Antony Chazapis's avatar
Antony Chazapis committed
606
607
           or None if the current version is not found in the given cluster.
        """
608

609
        q = ("select %s "
610
611
             "from versions v "
             "where serial = %s "
Antony Chazapis's avatar
Antony Chazapis committed
612
             "and cluster = ?")
613
614
        subq, args = self._construct_latest_version_subquery(
            node=node, before=before)
615
        if not all_props:
616
            q = q % ("serial", subq)
617
        else:
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
618
619
620
            q = q % (("serial, node, hash, size, type, source, mtime, muser, "
                      "uuid, checksum, cluster"),
                     subq)
621

622
        self.execute(q, args + [cluster])
Antony Chazapis's avatar
Antony Chazapis committed
623
624
625
626
        props = self.fetchone()
        if props is not None:
            return props
        return None
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
627

Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
628
    def version_lookup_bulk(self, nodes, before=inf, cluster=0,
629
                            all_props=True, order_by_path=False):
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
630
631
        """Lookup the current versions of the given nodes.
           Return a list with their properties:
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
632
633
           (serial, node, hash, size, type, source, mtime, muser, uuid,
            checksum, cluster).
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
634
        """
635

636
        if not nodes:
637
            return ()
638
        q = ("select %s "
639
             "from versions v, nodes n "
640
             "where serial in %s "
641
642
             "and v.node = n.node "
             "and cluster = ? %s ")
643
644
        subq, args = self._construct_latest_versions_subquery(
            nodes=nodes, before=before)
645
        if not all_props:
646
            q = q % ("serial", subq, '')
647
        else:
648
649
            q = q % (("serial, v.node, hash, size, type, source, mtime, "
                      "muser, uuid, checksum, cluster"),
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
650
                     subq,
651
                     "order by path" if order_by_path else "")
652

653
        args += [cluster]
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
654
655
        self.execute(q, args)
        return self.fetchall()
656

657
658
    def version_get_properties(self, serial, keys=(), propnames=_propnames,
                               node=None):
659
660
661
        """Return a sequence of values for the properties of
           the version specified by serial and the keys, in the order given.
           If keys is empty, return all properties in the order
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
662
663
           (serial, node, hash, size, type, source, mtime, muser, uuid,
            checksum, cluster).
664
        """
665

Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
666
667
        q = ("select serial, node, hash, size, type, source, mtime, muser, "
             "uuid, checksum, cluster "
Antony Chazapis's avatar
Antony Chazapis committed
668
             "from versions "
669
670
671
672
673
674
             "where serial = ? ")
        args = [serial]
        if node is not None:
            q += ("and node = ?")
            args += [node]
        self.execute(q, args)
675
676
677
        r = self.fetchone()
        if r is None:
            return r
678

679
680
681
        if not keys:
            return r
        return [r[propnames[k]] for k in keys if k in propnames]
682

683
684
    def version_put_property(self, serial, key, value):
        """Set value for the property of version specified by key."""
685

686
687
688
689
        if key not in _propnames:
            return
        q = "update versions set %s = ? where serial = ?" % key
        self.execute(q, (value, serial))
690

691
692
    def version_recluster(self, serial, cluster,
                          update_statistics_ancestors_depth=None):
Antony Chazapis's avatar
Antony Chazapis committed
693
        """Move the version into another cluster."""
694

Antony Chazapis's avatar
Antony Chazapis committed
695
696
697
        props = self.version_get_properties(serial)
        if not props:
            return
Antony Chazapis's avatar
Antony Chazapis committed
698
699
700
701
702
        node = props[NODE]
        size = props[SIZE]
        oldcluster = props[CLUSTER]
        if cluster == oldcluster:
            return
703

Antony Chazapis's avatar
Antony Chazapis committed
704
        mtime = time()
705
706
707
708
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster,
                                         update_statistics_ancestors_depth)
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
                                         update_statistics_ancestors_depth)
709

Antony Chazapis's avatar
Antony Chazapis committed
710
        q = "update versions set cluster = ? where serial = ?"
Antony Chazapis's avatar
Antony Chazapis committed
711
        self.execute(q, (cluster, serial))
712

713
    def version_remove(self, serial, update_statistics_ancestors_depth=None):
Antony Chazapis's avatar
Antony Chazapis committed
714
        """Remove the serial specified."""
715

716
        props = self.version_get_properties(serial)
Antony Chazapis's avatar
Antony Chazapis committed
717
718
719
        if not props:
            return
        node = props[NODE]
720
        hash = props[HASH]
Antony Chazapis's avatar
Antony Chazapis committed
721
722
        size = props[SIZE]
        cluster = props[CLUSTER]
723

Antony Chazapis's avatar
Antony Chazapis committed
724
        mtime = time()
725
726
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster,
                                         update_statistics_ancestors_depth)
727

Antony Chazapis's avatar
Antony Chazapis committed
728
729
        q = "delete from versions where serial = ?"
        self.execute(q, (serial,))
730

731
732
        props = self.version_lookup(node, cluster=cluster, all_props=False)
        if props:
733
            self.nodes_set_latest_version(node, props[0])
734
        return hash, size
735

736
    def attribute_get(self, serial, domain, keys=()):
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
737
738
        """Return a list of (key, value) pairs of the specific version.

739
740
741
           If keys is empty, return all attributes.
           Othwerise, return only those specified.
        """
742

743
744
745
746
        execute = self.execute
        if keys:
            marks = ','.join('?' for k in keys)
            q = ("select key, value from attributes "
747
748
                 "where key in (%s) and serial = ? and domain = ?" % (marks,))
            execute(q, keys + (serial, domain))
749
        else:
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
750
751
            q = ("select key, value from attributes where "
                 "serial = ? and domain = ?")
752
            execute(q, (serial, domain))
753
        return self.fetchall()
754

755
    def attribute_set(self, serial, domain, node, items, is_latest=True):
Antony Chazapis's avatar
Antony Chazapis committed
756
        """Set the attributes of the version specified by serial.
757
758
           Receive attributes as an iterable of (key, value) pairs.
        """
759

760
761
762
763
        q = ("insert or replace into attributes "
             "(serial, domain, node, is_latest, key, value) "
             "values (?, ?, ?, ?, ?, ?)")
        self.executemany(q, ((serial, domain, node, is_latest, k, v) for
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
764
                         k, v in items))
765

766
    def attribute_del(self, serial, domain, keys=()):
Antony Chazapis's avatar
Antony Chazapis committed
767
        """Delete attributes of the version specified by serial.
768
769
770
           If keys is empty, delete all attributes.
           Otherwise delete those specified.
        """
771

772
        if keys:
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
773
774
            q = ("delete from attributes "
                 "where serial = ? and domain = ? and key = ?")
775
            self.executemany(q, ((serial, domain, key) for key in keys))
776
        else:
777
778
            q = "delete from attributes where serial = ? and domain = ?"
            self.execute(q, (serial, domain))
779

Antony Chazapis's avatar
Antony Chazapis committed
780
    def attribute_copy(self, source, dest):
781
        q = ("insert or replace into attributes "
782
             "(serial, domain, node, is_latest, key, value) "
783
             "select ?, domain, node, is_latest, key, value from attributes "
784
785
             "where serial = ?")
        self.execute(q, (dest, source))
786

787
788
789
790
791
    def attribute_unset_is_latest(self, node, exclude):
        q = ("update attributes set is_latest = 0 "
             "where node = ? and serial != ?")
        self.execute(q, (node, exclude))

792
793
    def _construct_filters(self, domain, filterq):
        if not domain or not filterq:
794
            return None, None
795

796
797
        subqlist = []
        append = subqlist.append
798
        included, excluded, opers = parse_filters(filterq)
799
        args = []
800

801
        if included:
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
802
803
            subq = ("exists (select 1 from attributes where serial = v.serial "
                    "and domain = ? and ")
804
805
806
            subq += "(" + ' or '.join(('key = ?' for x in included)) + ")"
            subq += ")"
            args += [domain]
807
808
            args += included
            append(subq)
809

810
        if excluded:
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
811
812
            subq = ("not exists (select 1 from attributes where "
                    "serial = v.serial and domain = ? and ")
813
814
815
            subq += "(" + ' or '.join(('key = ?' for x in excluded)) + ")"
            subq += ")"
            args += [domain]
816
817
            args += excluded
            append(subq)
818

819
820
        if opers:
            for k, o, v in opers:
Sofia Papagiannaki's avatar
Sofia Papagiannaki committed
821
822
                subq = ("exists (select 1 from attributes where "
                        "serial = v.serial and domain = ? and ")
Antony Chazapis's avatar
Antony Chazapis committed
823
824
825
826
                subq += "key = ? and value %s ?" % (o,)
                subq += ")"
                args += [domain, k, v]
                append(subq)
827

828
829
        if not subqlist:
            return None, None
830

831
        subq = ' and ' + ' and '.join(subqlist)
832

833
        return subq, args
834

835
836
837
    def _construct_paths(self, pathq):
        if not pathq:
            return None, None
838

839
840
841
842
843
844
845
846
847
        subqlist = []
        args = []
        for path, match in pathq:
            if match == MATCH_PREFIX:
                subqlist.append("n.path like ? escape '\\'")
                args.append(self.escape_like(path) + '%')
            elif match == MATCH_EXACT:
                subqlist.append("n.path = ?")
                args.append(path)
848

849
850
        subq = ' and (' + ' or '.join(subqlist) + ')'
        args = tuple(args)
851

852
        return subq, args
853

854
855
856
    def _construct_size(self, sizeq):
        if not sizeq or len(sizeq) != 2:
            return None, None
857

858
859
860
861
862
863
864
865
        subq = ''
        args = []
        if sizeq[0]:
            subq += " and v.size >= ?"
            args += [sizeq[0]]
        if sizeq[1]:
            subq += " and v.size < ?"
            args += [sizeq[1]]
866

867
        return subq, args
868

869
870
871
872
873
874
    def _construct_versions_nodes_latest_version_subquery(self, before=inf):
        if before == inf:
            q = ("n.latest_version ")
            args = []
        else:
            q = ("(select max(serial) "
875
876
                 "from versions "
                 "where node = v.node and mtime < ?) ")
877
878
            args = [before]
        return q, args
879

880
881
882
883
884
885
    def _construct_latest_version_subquery(self, node=None, before=inf):
        where_cond = "node = v.node"
        args = []
        if node:
            where_cond = "node = ? "
            args = [node]
886

887
888
        if before == inf:
            q = ("(select latest_version "
889
890
                 "from nodes "
                 "where %s) ")
891
892
        else:
            q = ("(select max(serial) "
893
894
                 "from versions "
                 "where %s and mtime < ?) ")
895
896
            args += [before]
        return q % where_cond, args
897

898
899
900
901
902
903
    def _construct_latest_versions_subquery(self, nodes=(), before=inf):
        where_cond = ""
        args = []
        if nodes:
            where_cond = "node in (%s) " % ','.join('?' for node in nodes)
            args = nodes
904