Skip to content
Snippets Groups Projects
rpc.py 24.19 KiB
#
#

# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Inter-node RPC library.

"""

# pylint: disable=C0103,R0201,R0904
# C0103: Invalid name, since call_ are not valid
# R0201: Method could be a function, we keep all rpcs instance methods
# as not to change them back and forth between static/instance methods
# if they need to start using instance attributes
# R0904: Too many public methods

import logging
import zlib
import base64
import pycurl
import threading

from ganeti import utils
from ganeti import objects
from ganeti import http
from ganeti import serializer
from ganeti import constants
from ganeti import errors
from ganeti import netutils
from ganeti import ssconf
from ganeti import runtime
from ganeti import compat
from ganeti import rpc_defs

# Special module generated at build time
from ganeti import _generated_rpc

# pylint has a bug here, doesn't see this import
import ganeti.http.client  # pylint: disable=W0611


# Timeout for connecting to nodes (seconds)
_RPC_CONNECT_TIMEOUT = 5

_RPC_CLIENT_HEADERS = [
  "Content-type: %s" % http.HTTP_APP_JSON,
  "Expect:",
  ]

# Various time constants for the timeout table
_TMO_URGENT = 60 # one minute
_TMO_FAST = 5 * 60 # five minutes
_TMO_NORMAL = 15 * 60 # 15 minutes
_TMO_SLOW = 3600 # one hour
_TMO_4HRS = 4 * 3600
_TMO_1DAY = 86400

#: Special value to describe an offline host
_OFFLINE = object()


def Init():
  """Initializes the module-global HTTP client manager.

  Must be called before using any RPC function and while exactly one thread is
  running.

  """
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
  # one thread running. This check is just a safety measure -- it doesn't
  # cover all cases.
  assert threading.activeCount() == 1, \
         "Found more than one active thread when initializing pycURL"

  logging.info("Using PycURL %s", pycurl.version)

  pycurl.global_init(pycurl.GLOBAL_ALL)


def Shutdown():
  """Stops the module-global HTTP client manager.

  Must be called before quitting the program and while exactly one thread is
  running.

  """
  pycurl.global_cleanup()


def _ConfigRpcCurl(curl):
  noded_cert = str(constants.NODED_CERT_FILE)

  curl.setopt(pycurl.FOLLOWLOCATION, False)
  curl.setopt(pycurl.CAINFO, noded_cert)
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
  curl.setopt(pycurl.SSLCERT, noded_cert)
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
  curl.setopt(pycurl.SSLKEY, noded_cert)
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)


def RunWithRPC(fn):
  """RPC-wrapper decorator.

  When applied to a function, it runs it with the RPC system
  initialized, and it shutsdown the system afterwards. This means the
  function must be called without RPC being initialized.

  """
  def wrapper(*args, **kwargs):
    Init()
    try:
      return fn(*args, **kwargs)
    finally:
      Shutdown()
  return wrapper


def _Compress(data):
  """Compresses a string for transport over RPC.

  Small amounts of data are not compressed.

  @type data: str
  @param data: Data
  @rtype: tuple
  @return: Encoded data to send

  """
  # Small amounts of data are not compressed
  if len(data) < 512:
    return (constants.RPC_ENCODING_NONE, data)

  # Compress with zlib and encode in base64
  return (constants.RPC_ENCODING_ZLIB_BASE64,
          base64.b64encode(zlib.compress(data, 3)))


class RpcResult(object):
  """RPC Result class.

  This class holds an RPC result. It is needed since in multi-node
  calls we can't raise an exception just because one one out of many
  failed, and therefore we use this class to encapsulate the result.

  @ivar data: the data payload, for successful results, or None
  @ivar call: the name of the RPC call
  @ivar node: the name of the node to which we made the call
  @ivar offline: whether the operation failed because the node was
      offline, as opposed to actual failure; offline=True will always
      imply failed=True, in order to allow simpler checking if
      the user doesn't care about the exact failure mode
  @ivar fail_msg: the error message if the call failed

  """
  def __init__(self, data=None, failed=False, offline=False,
               call=None, node=None):
    self.offline = offline
    self.call = call
    self.node = node

    if offline:
      self.fail_msg = "Node is marked offline"
      self.data = self.payload = None
    elif failed:
      self.fail_msg = self._EnsureErr(data)
      self.data = self.payload = None
    else:
      self.data = data
      if not isinstance(self.data, (tuple, list)):
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
                         type(self.data))
        self.payload = None
      elif len(data) != 2:
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
                         "expected 2" % len(self.data))
        self.payload = None
      elif not self.data[0]:
        self.fail_msg = self._EnsureErr(self.data[1])
        self.payload = None
      else:
        # finally success
        self.fail_msg = None
        self.payload = data[1]

    for attr_name in ["call", "data", "fail_msg",
                      "node", "offline", "payload"]:
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name

  @staticmethod
  def _EnsureErr(val):
    """Helper to ensure we return a 'True' value for error."""
    if val:
      return val
    else:
      return "No error information"

  def Raise(self, msg, prereq=False, ecode=None):
    """If the result has failed, raise an OpExecError.

    This is used so that LU code doesn't have to check for each
    result, but instead can call this function.

    """
    if not self.fail_msg:
      return

    if not msg: # one could pass None for default message
      msg = ("Call '%s' to node '%s' has failed: %s" %
             (self.call, self.node, self.fail_msg))
    else:
      msg = "%s: %s" % (msg, self.fail_msg)
    if prereq:
      ec = errors.OpPrereqError
    else:
      ec = errors.OpExecError
    if ecode is not None:
      args = (msg, ecode)
    else:
      args = (msg, )
    raise ec(*args) # pylint: disable=W0142


def _SsconfResolver(ssconf_ips, node_list, _,
                    ssc=ssconf.SimpleStore,
                    nslookup_fn=netutils.Hostname.GetIP):
  """Return addresses for given node names.

  @type ssconf_ips: bool
  @param ssconf_ips: Use the ssconf IPs
  @type node_list: list
  @param node_list: List of node names
  @type ssc: class
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
  @type nslookup_fn: callable
  @param nslookup_fn: function use to do NS lookup
  @rtype: list of tuple; (string, string)
  @return: List of tuples containing node name and IP address

  """
  ss = ssc()
  family = ss.GetPrimaryIPFamily()

  if ssconf_ips:
    iplist = ss.GetNodePrimaryIPList()
    ipmap = dict(entry.split() for entry in iplist)
  else:
    ipmap = {}

  result = []
  for node in node_list:
    ip = ipmap.get(node)
    if ip is None:
      ip = nslookup_fn(node, family=family)
    result.append((node, ip))

  return result


class _StaticResolver:
  def __init__(self, addresses):
    """Initializes this class.

    """
    self._addresses = addresses

  def __call__(self, hosts, _):
    """Returns static addresses for hosts.

    """
    assert len(hosts) == len(self._addresses)
    return zip(hosts, self._addresses)


def _CheckConfigNode(name, node, accept_offline_node):
  """Checks if a node is online.

  @type name: string
  @param name: Node name
  @type node: L{objects.Node} or None
  @param node: Node object

  """
  if node is None:
    # Depend on DNS for name resolution
    ip = name
  elif node.offline and not accept_offline_node:
    ip = _OFFLINE
  else:
    ip = node.primary_ip
  return (name, ip)


def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
  """Calculate node addresses using configuration.

  """
  accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)

  assert accept_offline_node or opts is None, "Unknown option"

  # Special case for single-host lookups
  if len(hosts) == 1:
    (name, ) = hosts
    return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
  else:
    all_nodes = all_nodes_fn()
    return [_CheckConfigNode(name, all_nodes.get(name, None),
                             accept_offline_node)
            for name in hosts]


class _RpcProcessor:
  def __init__(self, resolver, port, lock_monitor_cb=None):
    """Initializes this class.

    @param resolver: callable accepting a list of hostnames, returning a list
      of tuples containing name and IP address (IP address can be the name or
      the special value L{_OFFLINE} to mark offline machines)
    @type port: int
    @param port: TCP port
    @param lock_monitor_cb: Callable for registering with lock monitor

    """
    self._resolver = resolver
    self._port = port
    self._lock_monitor_cb = lock_monitor_cb

  @staticmethod
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
    """Prepares requests by sorting offline hosts into separate list.

    @type body: dict
    @param body: a dictionary with per-host body data

    """
    results = {}
    requests = {}

    assert isinstance(body, dict)
    assert len(body) == len(hosts)
    assert compat.all(isinstance(v, str) for v in body.values())
    assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
        "%s != %s" % (hosts, body.keys())

    for (name, ip) in hosts:
      if ip is _OFFLINE:
        # Node is marked as offline
        results[name] = RpcResult(node=name, offline=True, call=procedure)
      else:
        requests[name] = \
          http.client.HttpClientRequest(str(ip), port,
                                        http.HTTP_POST, str("/%s" % procedure),
                                        headers=_RPC_CLIENT_HEADERS,
                                        post_data=body[name],
                                        read_timeout=read_timeout,
                                        nicename="%s/%s" % (name, procedure),
                                        curl_config_fn=_ConfigRpcCurl)

    return (results, requests)

  @staticmethod
  def _CombineResults(results, requests, procedure):
    """Combines pre-computed results for offline hosts with actual call results.

    """
    for name, req in requests.items():
      if req.success and req.resp_status_code == http.HTTP_OK:
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
                                node=name, call=procedure)
      else:
        # TODO: Better error reporting
        if req.error:
          msg = req.error
        else:
          msg = req.resp_body

        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
        host_result = RpcResult(data=msg, failed=True, node=name,
                                call=procedure)

      results[name] = host_result

    return results

  def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
               _req_process_fn=None):
    """Makes an RPC request to a number of nodes.

    @type hosts: sequence
    @param hosts: Hostnames
    @type procedure: string
    @param procedure: Request path
    @type body: dictionary
    @param body: dictionary with request bodies per host
    @type read_timeout: int or None
    @param read_timeout: Read timeout for request

    """
    assert read_timeout is not None, \
      "Missing RPC read timeout for procedure '%s'" % procedure

    if _req_process_fn is None:
      _req_process_fn = http.client.ProcessRequests

    (results, requests) = \
      self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
                            procedure, body, read_timeout)

    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)

    assert not frozenset(results).intersection(requests)

    return self._CombineResults(results, requests, procedure)


class _RpcClientBase:
  def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
               _req_process_fn=None):
    """Initializes this class.

    """
    proc = _RpcProcessor(resolver,
                         netutils.GetDaemonPort(constants.NODED),
                         lock_monitor_cb=lock_monitor_cb)
    self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
    self._encoder = compat.partial(self._EncodeArg, encoder_fn)

  @staticmethod
  def _EncodeArg(encoder_fn, (argkind, value)):
    """Encode argument.

    """
    if argkind is None:
      return value
    else:
      return encoder_fn(argkind)(value)

  def _Call(self, cdef, node_list, args):
    """Entry point for automatically generated RPC wrappers.

    """
    (procedure, _, resolver_opts, timeout, argdefs,
     prep_fn, postproc_fn, _) = cdef

    if callable(timeout):
      read_timeout = timeout(args)
    else:
      read_timeout = timeout

    if callable(resolver_opts):
      req_resolver_opts = resolver_opts(args)
    else:
      req_resolver_opts = resolver_opts

    if len(args) != len(argdefs):
      raise errors.ProgrammerError("Number of passed arguments doesn't match")

    enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
    if prep_fn is None:
      # for a no-op prep_fn, we serialise the body once, and then we
      # reuse it in the dictionary values
      body = serializer.DumpJson(enc_args)
      pnbody = dict((n, body) for n in node_list)
    else:
      # for a custom prep_fn, we pass the encoded arguments and the
      # node name to the prep_fn, and we serialise its return value
      assert callable(prep_fn)
      pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
                    for n in node_list)

    result = self._proc(node_list, procedure, pnbody, read_timeout,
                        req_resolver_opts)

    if postproc_fn:
      return dict(map(lambda (key, value): (key, postproc_fn(value)),
                      result.items()))
    else:
      return result


def _ObjectToDict(value):
  """Converts an object to a dictionary.

  @note: See L{objects}.

  """
  return value.ToDict()


def _ObjectListToDict(value):
  """Converts a list of L{objects} to dictionaries.

  """
  return map(_ObjectToDict, value)


def _EncodeNodeToDiskDict(value):
  """Encodes a dictionary with node name as key and disk objects as values.

  """
  return dict((name, _ObjectListToDict(disks))
              for name, disks in value.items())


def _PrepareFileUpload(getents_fn, filename):
  """Loads a file and prepares it for an upload to nodes.

  """
  statcb = utils.FileStatHelper()
  data = _Compress(utils.ReadFile(filename, preread=statcb))
  st = statcb.st

  if getents_fn is None:
    getents_fn = runtime.GetEnts

  getents = getents_fn()

  return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
          getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]


def _PrepareFinalizeExportDisks(snap_disks):
  """Encodes disks for finalizing export.

  """
  flat_disks = []

  for disk in snap_disks:
    if isinstance(disk, bool):
      flat_disks.append(disk)
    else:
      flat_disks.append(disk.ToDict())

  return flat_disks


def _EncodeImportExportIO((ieio, ieioargs)):
  """Encodes import/export I/O information.

  """
  if ieio == constants.IEIO_RAW_DISK:
    assert len(ieioargs) == 1
    return (ieio, (ieioargs[0].ToDict(), ))

  if ieio == constants.IEIO_SCRIPT:
    assert len(ieioargs) == 2
    return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))

  return (ieio, ieioargs)


def _EncodeBlockdevRename(value):
  """Encodes information for renaming block devices.

  """
  return [(d.ToDict(), uid) for d, uid in value]


def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
  """Annotates just DRBD disks layouts.

  """
  assert disk.dev_type == constants.LD_DRBD8

  disk.params = objects.FillDict(drbd_params, disk.params)
  (dev_data, dev_meta) = disk.children
  dev_data.params = objects.FillDict(data_params, dev_data.params)
  dev_meta.params = objects.FillDict(meta_params, dev_meta.params)

  return disk


def _AnnotateDParamsGeneric(disk, (params, )):
  """Generic disk parameter annotation routine.

  """
  assert disk.dev_type != constants.LD_DRBD8

  disk.params = objects.FillDict(params, disk.params)

  return disk


def AnnotateDiskParams(template, disks, disk_params):
  """Annotates the disk objects with the disk parameters.

  @param template: The disk template used
  @param disks: The list of disks objects to annotate
  @param disk_params: The disk paramaters for annotation
  @returns: A list of disk objects annotated

  """
  ld_params = objects.Disk.ComputeLDParams(template, disk_params)

  if template == constants.DT_DRBD8:
    annotation_fn = _AnnotateDParamsDRBD
  elif template == constants.DT_DISKLESS:
    annotation_fn = lambda disk, _: disk
  else:
    annotation_fn = _AnnotateDParamsGeneric

  new_disks = []
  for disk in disks:
    new_disks.append(annotation_fn(disk.Copy(), ld_params))

  return new_disks


#: Generic encoders
_ENCODERS = {
  rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
  rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
  rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
  rpc_defs.ED_COMPRESS: _Compress,
  rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
  rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
  rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
  }


class RpcRunner(_RpcClientBase,
                _generated_rpc.RpcClientDefault,
                _generated_rpc.RpcClientBootstrap,
                _generated_rpc.RpcClientDnsOnly,
                _generated_rpc.RpcClientConfig):
  """RPC runner class.

  """
  def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
    """Initialized the RPC runner.

    @type cfg: L{config.ConfigWriter}
    @param cfg: Configuration
    @type lock_monitor_cb: callable
    @param lock_monitor_cb: Lock monitor callback

    """
    self._cfg = cfg

    encoders = _ENCODERS.copy()

    encoders.update({
      # Encoders requiring configuration object
      rpc_defs.ED_INST_DICT: self._InstDict,
      rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
      rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,

      # Encoders annotating disk parameters
      rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
      rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,

      # Encoders with special requirements
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
      })

    # Resolver using configuration
    resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
                              cfg.GetAllNodesInfo)

    # Pylint doesn't recognize multiple inheritance properly, see
    # <http://www.logilab.org/ticket/36586> and
    # <http://www.logilab.org/ticket/35642>
    # pylint: disable=W0233
    _RpcClientBase.__init__(self, resolver, encoders.get,
                            lock_monitor_cb=lock_monitor_cb,
                            _req_process_fn=_req_process_fn)
    _generated_rpc.RpcClientConfig.__init__(self)
    _generated_rpc.RpcClientBootstrap.__init__(self)
    _generated_rpc.RpcClientDnsOnly.__init__(self)
    _generated_rpc.RpcClientDefault.__init__(self)

  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
    """Convert the given instance to a dict.

    This is done via the instance's ToDict() method and additionally
    we fill the hvparams with the cluster defaults.

    @type instance: L{objects.Instance}
    @param instance: an Instance object
    @type hvp: dict or None
    @param hvp: a dictionary with overridden hypervisor parameters
    @type bep: dict or None
    @param bep: a dictionary with overridden backend parameters
    @type osp: dict or None
    @param osp: a dictionary with overridden os parameters
    @rtype: dict
    @return: the instance dict, with the hvparams filled with the
        cluster defaults

    """
    idict = instance.ToDict()
    cluster = self._cfg.GetClusterInfo()
    idict["hvparams"] = cluster.FillHV(instance)
    if hvp is not None:
      idict["hvparams"].update(hvp)
    idict["beparams"] = cluster.FillBE(instance)
    if bep is not None:
      idict["beparams"].update(bep)
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
    if osp is not None:
      idict["osparams"].update(osp)
    for nic in idict["nics"]:
      nic["nicparams"] = objects.FillDict(
        cluster.nicparams[constants.PP_DEFAULT],
        nic["nicparams"])
    idict["disks"] = self._DisksDictDP((instance.disks, instance))
    return idict

  def _InstDictHvpBepDp(self, (instance, hvp, bep)):
    """Wrapper for L{_InstDict}.

    """
    return self._InstDict(instance, hvp=hvp, bep=bep)

  def _InstDictOspDp(self, (instance, osparams)):
    """Wrapper for L{_InstDict}.

    """
    return self._InstDict(instance, osp=osparams)

  def _DisksDictDP(self, (disks, instance)):
    """Wrapper for L{AnnotateDiskParams}.

    """
    diskparams = self._cfg.GetInstanceDiskParams(instance)
    return [disk.ToDict()
            for disk in AnnotateDiskParams(instance.disk_template,
                                           disks, diskparams)]

  def _SingleDiskDictDP(self, (disk, instance)):
    """Wrapper for L{AnnotateDiskParams}.

    """
    (anno_disk,) = self._DisksDictDP(([disk], instance))
    return anno_disk


class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
  """RPC wrappers for job queue.

  """
  def __init__(self, context, address_list):
    """Initializes this class.

    """
    if address_list is None:
      resolver = compat.partial(_SsconfResolver, True)
    else:
      # Caller provided an address list
      resolver = _StaticResolver(address_list)

    _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
                            lock_monitor_cb=context.glm.AddToLockMonitor)
    _generated_rpc.RpcClientJobQueue.__init__(self)


class BootstrapRunner(_RpcClientBase,
                      _generated_rpc.RpcClientBootstrap,
                      _generated_rpc.RpcClientDnsOnly):
  """RPC wrappers for bootstrapping.

  """
  def __init__(self):
    """Initializes this class.

    """
    # Pylint doesn't recognize multiple inheritance properly, see
    # <http://www.logilab.org/ticket/36586> and
    # <http://www.logilab.org/ticket/35642>
    # pylint: disable=W0233
    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
                            _ENCODERS.get)
    _generated_rpc.RpcClientBootstrap.__init__(self)
    _generated_rpc.RpcClientDnsOnly.__init__(self)


class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
  """RPC wrappers for calls using only DNS.

  """
  def __init__(self):
    """Initialize this class.

    """
    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
                            _ENCODERS.get)
    _generated_rpc.RpcClientDnsOnly.__init__(self)


class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
  """RPC wrappers for L{config}.

  """
  def __init__(self, context, address_list, _req_process_fn=None,
               _getents=None):
    """Initializes this class.

    """
    if context:
      lock_monitor_cb = context.glm.AddToLockMonitor
    else:
      lock_monitor_cb = None

    if address_list is None:
      resolver = compat.partial(_SsconfResolver, True)
    else:
      # Caller provided an address list
      resolver = _StaticResolver(address_list)

    encoders = _ENCODERS.copy()

    encoders.update({
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
      })

    _RpcClientBase.__init__(self, resolver, encoders.get,
                            lock_monitor_cb=lock_monitor_cb,
                            _req_process_fn=_req_process_fn)
    _generated_rpc.RpcClientConfig.__init__(self)