Commit 12bce260 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

RPC: Compress file upload data

Adding compression to larger amounts of data is more efficient than
transferring it (len(nodes) - 1) times over the network without
compression. We were able to compress a 800KB config file to about
30 KB, which is about 40 KB with Base64 encoding (required due to
the way SimpleJson handles strings).

Reviewed-by: ultrotter
parent 832261fd
......@@ -33,6 +33,8 @@ import subprocess
import random
import logging
import tempfile
import zlib
import base64
from ganeti import errors
from ganeti import utils
......@@ -67,6 +69,25 @@ def _GetSshRunner(cluster_name):
return ssh.SshRunner(cluster_name)
def _Decompress(data):
"""Unpacks data compressed by the RPC client.
@type data: list or tuple
@param data: Data sent by RPC client
@rtype: str
@return: Decompressed data
assert len(data) == 2
(encoding, content) = data
if encoding == constants.RPC_ENCODING_NONE:
return content
elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
return zlib.decompress(base64.b64decode(content))
raise AssertionError("Unknown data encoding")
def _CleanDirectory(path, exclude=[]):
"""Removes all regular files in a directory.
......@@ -1238,7 +1259,9 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
" upload targets: '%s'", file_name)
return False
utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
raw_data = _Decompress(data)
utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
atime=atime, mtime=mtime)
return True
......@@ -1949,7 +1972,7 @@ def JobQueueUpdate(file_name, content):
return False
# Write and replace the file atomically
utils.WriteFile(file_name, data=content)
utils.WriteFile(file_name, data=_Decompress(content))
return True
......@@ -240,6 +240,10 @@ DEFAULT_VG = "xenvg"
MIN_VG_SIZE = 20480
# RPC constants
# os related constants
......@@ -33,6 +33,8 @@
import os
import socket
import logging
import zlib
import base64
from ganeti import utils
from ganeti import objects
......@@ -334,6 +336,26 @@ class RpcRunner(object):
return c.GetResults()[node]
def _Compress(data):
"""Compresses a string for transport over RPC.
Small amounts of data are not compressed.
@type data: str
@param data: Data
@rtype: tuple
@return: Encoded data to send
# Small amounts of data are not compressed
if len(data) < 512:
return (constants.RPC_ENCODING_NONE, data)
# Compress with zlib and encode in base64
return (constants.RPC_ENCODING_ZLIB_BASE64,
base64.b64encode(zlib.compress(data, 3)))
# Begin RPC calls
......@@ -688,7 +710,8 @@ class RpcRunner(object):
to optimize the RPC speed
data = utils.ReadFile(file_name)
file_contents = utils.ReadFile(file_name)
data = cls._Compress(file_contents)
st = os.stat(file_name)
params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
st.st_atime, st.st_mtime]
......@@ -907,7 +930,7 @@ class RpcRunner(object):
return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
[file_name, content],
[file_name, cls._Compress(content)],
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment