archipelagoblocker.py 7.72 KB
Newer Older
Vangelis Koukis's avatar
Vangelis Koukis committed
1
# Copyright (C) 2010-2014 GRNET S.A.
2
#
Vangelis Koukis's avatar
Vangelis Koukis committed
3
4
5
6
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
7
#
Vangelis Koukis's avatar
Vangelis Koukis committed
8
9
10
11
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
12
#
Vangelis Koukis's avatar
Vangelis Koukis committed
13
14
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
16
17

from hashlib import new as newhasher
from binascii import hexlify
18
import ConfigParser
19
20
21
22
23
24
25
26

from context_archipelago import ArchipelagoObject, file_sync_read_chunks
from archipelago.common import (
    Request,
    xseg_reply_info,
    string_at,
    )

Chrysostomos Nanakos's avatar
Chrysostomos Nanakos committed
27
28
29
30
from pithos.workers import (
    glue,
    monkey,
    )
31
32
33

monkey.patch_Request()

Chrysostomos Nanakos's avatar
Chrysostomos Nanakos committed
34

35
36
37
38
39
40
41
42
43
44
class ArchipelagoBlocker(object):
    """Blocker.
       Required constructor parameters: blocksize, hashtype.
    """

    blocksize = None
    blockpool = None
    hashtype = None

    def __init__(self, **params):
45
46
        cfg = ConfigParser.ConfigParser()
        cfg.readfp(open(params['archipelago_cfile']))
47
48
49
50
51
52
53
54
55
56
57
58
        blocksize = params['blocksize']
        hashtype = params['hashtype']
        try:
            hasher = newhasher(hashtype)
        except ValueError:
            msg = "Variable hashtype '%s' is not available from hashlib"
            raise ValueError(msg % (hashtype,))

        hasher.update("")
        emptyhash = hasher.digest()

        self.blocksize = blocksize
59
        self.ioctx_pool = glue.WorkerGlue.ioctx_pool
60
        self.dst_port = int(cfg.getint('mapperd', 'blockerb_port'))
61
62
63
64
65
66
67
68
69
        self.hashtype = hashtype
        self.hashlen = len(emptyhash)
        self.emptyhash = emptyhash

    def _pad(self, block):
        return block + ('\x00' * (self.blocksize - len(block)))

    def _get_rear_block(self, blkhash, create=0):
        name = hexlify(blkhash)
Chrysostomos Nanakos's avatar
Chrysostomos Nanakos committed
70
        return ArchipelagoObject(name, self.ioctx_pool, self.dst_port, create)
71
72
73
74

    def _check_rear_block(self, blkhash):
        filename = hexlify(blkhash)
        ioctx = self.ioctx_pool.pool_get()
Chrysostomos Nanakos's avatar
Chrysostomos Nanakos committed
75
        req = Request.get_info_request(ioctx, self.dst_port, filename)
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
        req.submit()
        req.wait()
        ret = req.success()
        req.put()
        self.ioctx_pool.pool_put(ioctx)
        if ret:
            return True
        else:
            return False

    def block_hash(self, data):
        """Hash a block of data"""
        hasher = newhasher(self.hashtype)
        hasher.update(data.rstrip('\x00'))
        return hasher.digest()

    def block_ping(self, hashes):
        """Check hashes for existence and
           return those missing from block storage.
        """
        notfound = []
        append = notfound.append

        for h in hashes:
            if h not in notfound and not self._check_rear_block(h):
                append(h)

        return notfound

    def block_retr(self, hashes):
        """Retrieve blocks from storage by their hashes."""
        blocksize = self.blocksize
        blocks = []
        append = blocks.append
        block = None

        for h in hashes:
            if h == self.emptyhash:
                append(self._pad(''))
                continue
            with self._get_rear_block(h, 0) as rbl:
                if not rbl:
                    break
                for block in rbl.sync_read_chunks(blocksize, 1, 0):
                    break  # there should be just one block there
            if not block:
                break
            append(self._pad(block))

        return blocks

    def block_retr_archipelago(self, hashes):
        """Retrieve blocks from storage by their hashes"""
        blocks = []
        append = blocks.append

        ioctx = self.ioctx_pool.pool_get()
        archip_emptyhash = hexlify(self.emptyhash)

        for h in hashes:
            if h == archip_emptyhash:
                append(self._pad(''))
                continue
            req = Request.get_info_request(ioctx, self.dst_port, h)
            req.submit()
            req.wait()
            ret = req.success()
            if ret:
                info = req.get_data(_type=xseg_reply_info)
                size = info.contents.size
                req.put()
                req_data = Request.get_read_request(ioctx, self.dst_port, h,
                                                    size=size)
                req_data.submit()
                req_data.wait()
                ret_data = req_data.success()
                if ret_data:
                    append(self._pad(string_at(req_data.get_data(), size)))
                    req_data.put()
                else:
                    req_data.put()
Chrysostomos Nanakos's avatar
Chrysostomos Nanakos committed
157
                    self.ioctx_pool.pool_put(ioctx)
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
                    raise Exception("Cannot retrieve Archipelago data.")
            else:
                req.put()
                self.ioctx_pool.pool_put(ioctx)
                raise Exception("Bad block file.")
        self.ioctx_pool.pool_put(ioctx)
        return blocks

    def block_stor(self, blocklist):
        """Store a bunch of blocks and return (hashes, missing).
           Hashes is a list of the hashes of the blocks,
           missing is a list of indices in that list indicating
           which blocks were missing from the store.
        """
        block_hash = self.block_hash
        hashlist = [block_hash(b) for b in blocklist]
        missing = [i for i, h in enumerate(hashlist) if not
                   self._check_rear_block(h)]
        for i in missing:
            with self._get_rear_block(hashlist[i], 1) as rbl:
                rbl.sync_write(blocklist[i])  # XXX: verify?

        return hashlist, missing

    def block_delta(self, blkhash, offset, data):
        """Construct and store a new block from a given block
           and a data 'patch' applied at offset. Return:
           (the hash of the new block, if the block already existed)
        """

        blocksize = self.blocksize
        if offset >= blocksize or not data:
            return None, None

        block = self.block_retr((blkhash,))
        if not block:
            return None, None

        block = block[0]
        newblock = block[:offset] + data
        if len(newblock) > blocksize:
            newblock = newblock[:blocksize]
        elif len(newblock) < blocksize:
            newblock += block[len(newblock):]

        h, a = self.block_stor((newblock,))
        return h[0], 1 if a else 0

    def block_hash_file(self, archipelagoobject):
        """Return the list of hashes (hashes map)
           for the blocks in a buffered file.
           Helper method, does not affect store.
        """
        hashes = []
        append = hashes.append
        block_hash = self.block_hash

Chrysostomos Nanakos's avatar
Chrysostomos Nanakos committed
215
216
        for block in file_sync_read_chunks(archipelagoobject,
                                           self.blocksize, 1, 0):
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
            append(block_hash(block))

        return hashes

    def block_stor_file(self, archipelagoobject):
        """Read blocks from buffered file object and store them. Return:
           (bytes read, list of hashes, list of hashes that were missing)
        """
        blocksize = self.blocksize
        block_stor = self.block_stor
        hashlist = []
        hextend = hashlist.extend
        storedlist = []
        sextend = storedlist.extend
        lastsize = 0

        for block in file_sync_read_chunks(archipelagoobject, blocksize, 1, 0):
            hl, sl = block_stor((block,))
            hextend(hl)
            sextend(sl)
            lastsize = len(block)

        size = (len(hashlist) - 1) * blocksize + lastsize if hashlist else 0
        return size, hashlist, storedlist