diff --git a/lib/backend.py b/lib/backend.py index 8a5aa8cb2fd7b734c63bb3e24fcf32da0257afdd..e2aac3bb2084705ec4ee732dd66f56e924349adf 100644 --- a/lib/backend.py +++ b/lib/backend.py @@ -33,6 +33,8 @@ import subprocess import random import logging import tempfile +import zlib +import base64 from ganeti import errors from ganeti import utils @@ -67,6 +69,25 @@ def _GetSshRunner(cluster_name): return ssh.SshRunner(cluster_name) +def _Decompress(data): + """Unpacks data compressed by the RPC client. + + @type data: list or tuple + @param data: Data sent by RPC client + @rtype: str + @return: Decompressed data + + """ + assert len(data) == 2 + (encoding, content) = data + if encoding == constants.RPC_ENCODING_NONE: + return content + elif encoding == constants.RPC_ENCODING_ZLIB_BASE64: + return zlib.decompress(base64.b64decode(content)) + else: + raise AssertionError("Unknown data encoding") + + def _CleanDirectory(path, exclude=[]): """Removes all regular files in a directory. @@ -1238,7 +1259,9 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime): " upload targets: '%s'", file_name) return False - utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid, + raw_data = _Decompress(data) + + utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid, atime=atime, mtime=mtime) return True @@ -1949,7 +1972,7 @@ def JobQueueUpdate(file_name, content): return False # Write and replace the file atomically - utils.WriteFile(file_name, data=content) + utils.WriteFile(file_name, data=_Decompress(content)) return True diff --git a/lib/constants.py b/lib/constants.py index f9e48389bd8185a8f55c184035916ce57e1d2bb0..2c05be4e9641b2d470e553aa567620a324822215 100644 --- a/lib/constants.py +++ b/lib/constants.py @@ -240,6 +240,10 @@ DEFAULT_VG = "xenvg" BIND_ADDRESS_GLOBAL = "0.0.0.0" MIN_VG_SIZE = 20480 +# RPC constants +(RPC_ENCODING_NONE, + RPC_ENCODING_ZLIB_BASE64) = range(2) + # os related constants OS_VALID_STATUS = "VALID" OS_SCRIPT_CREATE = 'create' diff --git a/lib/rpc.py b/lib/rpc.py index 1e2ac03df14ebefbacb0a4553b3dfabe2e39d1df..534675648fc77f098e3a8f01fc1f33451a5ac046 100644 --- a/lib/rpc.py +++ b/lib/rpc.py @@ -33,6 +33,8 @@ import os import socket import logging +import zlib +import base64 from ganeti import utils from ganeti import objects @@ -334,6 +336,26 @@ class RpcRunner(object): c.ConnectNode(node) return c.GetResults()[node] + @staticmethod + def _Compress(data): + """Compresses a string for transport over RPC. + + Small amounts of data are not compressed. + + @type data: str + @param data: Data + @rtype: tuple + @return: Encoded data to send + + """ + # Small amounts of data are not compressed + if len(data) < 512: + return (constants.RPC_ENCODING_NONE, data) + + # Compress with zlib and encode in base64 + return (constants.RPC_ENCODING_ZLIB_BASE64, + base64.b64encode(zlib.compress(data, 3))) + # # Begin RPC calls # @@ -688,7 +710,8 @@ class RpcRunner(object): to optimize the RPC speed """ - data = utils.ReadFile(file_name) + file_contents = utils.ReadFile(file_name) + data = cls._Compress(file_contents) st = os.stat(file_name) params = [file_name, data, st.st_mode, st.st_uid, st.st_gid, st.st_atime, st.st_mtime] @@ -907,7 +930,7 @@ class RpcRunner(object): """ return cls._StaticMultiNodeCall(node_list, "jobqueue_update", - [file_name, content], + [file_name, cls._Compress(content)], address_list=address_list) @classmethod