rpc.py 20.3 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
#
Iustin Pop's avatar
Iustin Pop committed
2
3
#

4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22
"""Inter-node RPC library.
Iustin Pop's avatar
Iustin Pop committed
23
24
25

"""

26
# pylint: disable=C0103,R0201,R0904
Iustin Pop's avatar
Iustin Pop committed
27
28
29
30
31
# C0103: Invalid name, since call_ are not valid
# R0201: Method could be a function, we keep all rpcs instance methods
# as not to change them back and forth between static/instance methods
# if they need to start using instance attributes
# R0904: Too many public methods
Iustin Pop's avatar
Iustin Pop committed
32
33

import os
34
import logging
35
36
import zlib
import base64
37
38
import pycurl
import threading
Iustin Pop's avatar
Iustin Pop committed
39
40
41

from ganeti import utils
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49
from ganeti import compat
Iustin Pop's avatar
Iustin Pop committed
50

51
52
53
# Special module generated at build time
from ganeti import _generated_rpc

Iustin Pop's avatar
Iustin Pop committed
54
# pylint has a bug here, doesn't see this import
55
import ganeti.http.client  # pylint: disable=W0611
56

Iustin Pop's avatar
Iustin Pop committed
57

58
59
60
61
62
# Timeout for connecting to nodes (seconds)
_RPC_CONNECT_TIMEOUT = 5

_RPC_CLIENT_HEADERS = [
  "Content-type: %s" % http.HTTP_APP_JSON,
Iustin Pop's avatar
Iustin Pop committed
63
  "Expect:",
64
  ]
65

66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# Various time constants for the timeout table
_TMO_URGENT = 60 # one minute
_TMO_FAST = 5 * 60 # five minutes
_TMO_NORMAL = 15 * 60 # 15 minutes
_TMO_SLOW = 3600 # one hour
_TMO_4HRS = 4 * 3600
_TMO_1DAY = 86400

# Timeout table that will be built later by decorators
# Guidelines for choosing timeouts:
# - call used during watcher: timeout -> 1min, _TMO_URGENT
# - trivial (but be sure it is trivial) (e.g. reading a file): 5min, _TMO_FAST
# - other calls: 15 min, _TMO_NORMAL
# - special calls (instance add, etc.): either _TMO_SLOW (1h) or huge timeouts

_TIMEOUTS = {
}

84
85
86
#: Special value to describe an offline host
_OFFLINE = object()

87
88
89
90

def Init():
  """Initializes the module-global HTTP client manager.

91
92
  Must be called before using any RPC function and while exactly one thread is
  running.
93
94

  """
95
96
97
98
99
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
  # one thread running. This check is just a safety measure -- it doesn't
  # cover all cases.
  assert threading.activeCount() == 1, \
         "Found more than one active thread when initializing pycURL"
100

101
  logging.info("Using PycURL %s", pycurl.version)
102

103
  pycurl.global_init(pycurl.GLOBAL_ALL)
104
105
106
107
108


def Shutdown():
  """Stops the module-global HTTP client manager.

109
110
  Must be called before quitting the program and while exactly one thread is
  running.
111
112

  """
113
114
115
116
117
  pycurl.global_cleanup()


def _ConfigRpcCurl(curl):
  noded_cert = str(constants.NODED_CERT_FILE)
118

119
120
121
122
123
124
125
126
127
128
129
  curl.setopt(pycurl.FOLLOWLOCATION, False)
  curl.setopt(pycurl.CAINFO, noded_cert)
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
  curl.setopt(pycurl.SSLCERT, noded_cert)
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
  curl.setopt(pycurl.SSLKEY, noded_cert)
  curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)


130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def _RpcTimeout(secs):
  """Timeout decorator.

  When applied to a rpc call_* function, it updates the global timeout
  table with the given function/timeout.

  """
  def decorator(f):
    name = f.__name__
    assert name.startswith("call_")
    _TIMEOUTS[name[len("call_"):]] = secs
    return f
  return decorator


145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def RunWithRPC(fn):
  """RPC-wrapper decorator.

  When applied to a function, it runs it with the RPC system
  initialized, and it shutsdown the system afterwards. This means the
  function must be called without RPC being initialized.

  """
  def wrapper(*args, **kwargs):
    Init()
    try:
      return fn(*args, **kwargs)
    finally:
      Shutdown()
  return wrapper


162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def _Compress(data):
  """Compresses a string for transport over RPC.

  Small amounts of data are not compressed.

  @type data: str
  @param data: Data
  @rtype: tuple
  @return: Encoded data to send

  """
  # Small amounts of data are not compressed
  if len(data) < 512:
    return (constants.RPC_ENCODING_NONE, data)

  # Compress with zlib and encode in base64
  return (constants.RPC_ENCODING_ZLIB_BASE64,
          base64.b64encode(zlib.compress(data, 3)))


182
183
184
185
186
187
188
class RpcResult(object):
  """RPC Result class.

  This class holds an RPC result. It is needed since in multi-node
  calls we can't raise an exception just because one one out of many
  failed, and therefore we use this class to encapsulate the result.

Michael Hanselmann's avatar
Michael Hanselmann committed
189
  @ivar data: the data payload, for successful results, or None
190
191
192
193
194
195
  @ivar call: the name of the RPC call
  @ivar node: the name of the node to which we made the call
  @ivar offline: whether the operation failed because the node was
      offline, as opposed to actual failure; offline=True will always
      imply failed=True, in order to allow simpler checking if
      the user doesn't care about the exact failure mode
196
  @ivar fail_msg: the error message if the call failed
197

198
  """
199
200
201
202
203
  def __init__(self, data=None, failed=False, offline=False,
               call=None, node=None):
    self.offline = offline
    self.call = call
    self.node = node
204

205
    if offline:
206
      self.fail_msg = "Node is marked offline"
207
      self.data = self.payload = None
208
    elif failed:
209
      self.fail_msg = self._EnsureErr(data)
210
      self.data = self.payload = None
211
212
    else:
      self.data = data
213
      if not isinstance(self.data, (tuple, list)):
214
215
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
                         type(self.data))
216
        self.payload = None
217
      elif len(data) != 2:
218
219
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
                         "expected 2" % len(self.data))
220
        self.payload = None
221
      elif not self.data[0]:
222
        self.fail_msg = self._EnsureErr(self.data[1])
223
        self.payload = None
224
      else:
225
        # finally success
226
        self.fail_msg = None
227
228
        self.payload = data[1]

Iustin Pop's avatar
Iustin Pop committed
229
230
231
    for attr_name in ["call", "data", "fail_msg",
                      "node", "offline", "payload"]:
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
232

233
234
235
236
237
238
239
  @staticmethod
  def _EnsureErr(val):
    """Helper to ensure we return a 'True' value for error."""
    if val:
      return val
    else:
      return "No error information"
240

241
  def Raise(self, msg, prereq=False, ecode=None):
242
243
244
245
246
247
    """If the result has failed, raise an OpExecError.

    This is used so that LU code doesn't have to check for each
    result, but instead can call this function.

    """
248
249
250
251
252
253
254
255
256
257
258
259
    if not self.fail_msg:
      return

    if not msg: # one could pass None for default message
      msg = ("Call '%s' to node '%s' has failed: %s" %
             (self.call, self.node, self.fail_msg))
    else:
      msg = "%s: %s" % (msg, self.fail_msg)
    if prereq:
      ec = errors.OpPrereqError
    else:
      ec = errors.OpExecError
260
    if ecode is not None:
Iustin Pop's avatar
Iustin Pop committed
261
      args = (msg, ecode)
262
263
    else:
      args = (msg, )
264
    raise ec(*args) # pylint: disable=W0142
265
266


267
268
269
def _SsconfResolver(node_list,
                    ssc=ssconf.SimpleStore,
                    nslookup_fn=netutils.Hostname.GetIP):
270
271
272
273
274
275
  """Return addresses for given node names.

  @type node_list: list
  @param node_list: List of node names
  @type ssc: class
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
276
277
  @type nslookup_fn: callable
  @param nslookup_fn: function use to do NS lookup
278
279
  @rtype: list of tuple; (string, string)
  @return: List of tuples containing node name and IP address
280
281

  """
Manuel Franceschini's avatar
Manuel Franceschini committed
282
283
284
  ss = ssc()
  iplist = ss.GetNodePrimaryIPList()
  family = ss.GetPrimaryIPFamily()
285
  ipmap = dict(entry.split() for entry in iplist)
286
287

  result = []
288
  for node in node_list:
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
    ip = ipmap.get(node)
    if ip is None:
      ip = nslookup_fn(node, family=family)
    result.append((node, ip))

  return result


class _StaticResolver:
  def __init__(self, addresses):
    """Initializes this class.

    """
    self._addresses = addresses

  def __call__(self, hosts):
    """Returns static addresses for hosts.

    """
    assert len(hosts) == len(self._addresses)
    return zip(hosts, self._addresses)

311

312
313
def _CheckConfigNode(name, node):
  """Checks if a node is online.
314

315
316
317
318
  @type name: string
  @param name: Node name
  @type node: L{objects.Node} or None
  @param node: Node object
319

320
321
322
323
324
325
326
327
328
  """
  if node is None:
    # Depend on DNS for name resolution
    ip = name
  elif node.offline:
    ip = _OFFLINE
  else:
    ip = node.primary_ip
  return (name, ip)
Iustin Pop's avatar
Iustin Pop committed
329
330


331
332
def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
  """Calculate node addresses using configuration.
Iustin Pop's avatar
Iustin Pop committed
333
334

  """
335
336
337
338
339
340
341
342
343
344
345
  # Special case for single-host lookups
  if len(hosts) == 1:
    (name, ) = hosts
    return [_CheckConfigNode(name, single_node_fn(name))]
  else:
    all_nodes = all_nodes_fn()
    return [_CheckConfigNode(name, all_nodes.get(name, None))
            for name in hosts]


class _RpcProcessor:
346
  def __init__(self, resolver, port, lock_monitor_cb=None):
347
348
349
350
351
352
353
    """Initializes this class.

    @param resolver: callable accepting a list of hostnames, returning a list
      of tuples containing name and IP address (IP address can be the name or
      the special value L{_OFFLINE} to mark offline machines)
    @type port: int
    @param port: TCP port
354
    @param lock_monitor_cb: Callable for registering with lock monitor
355

Iustin Pop's avatar
Iustin Pop committed
356
    """
357
358
    self._resolver = resolver
    self._port = port
359
    self._lock_monitor_cb = lock_monitor_cb
360

361
362
363
  @staticmethod
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
    """Prepares requests by sorting offline hosts into separate list.
364

365
366
367
    """
    results = {}
    requests = {}
368

369
370
371
372
373
374
375
376
377
378
    for (name, ip) in hosts:
      if ip is _OFFLINE:
        # Node is marked as offline
        results[name] = RpcResult(node=name, offline=True, call=procedure)
      else:
        requests[name] = \
          http.client.HttpClientRequest(str(ip), port,
                                        http.HTTP_PUT, str("/%s" % procedure),
                                        headers=_RPC_CLIENT_HEADERS,
                                        post_data=body,
379
                                        read_timeout=read_timeout,
380
381
                                        nicename="%s/%s" % (name, procedure),
                                        curl_config_fn=_ConfigRpcCurl)
Iustin Pop's avatar
Iustin Pop committed
382

383
384
385
386
387
    return (results, requests)

  @staticmethod
  def _CombineResults(results, requests, procedure):
    """Combines pre-computed results for offline hosts with actual call results.
388

Iustin Pop's avatar
Iustin Pop committed
389
    """
390
391
392
393
394
395
396
397
398
399
    for name, req in requests.items():
      if req.success and req.resp_status_code == http.HTTP_OK:
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
                                node=name, call=procedure)
      else:
        # TODO: Better error reporting
        if req.error:
          msg = req.error
        else:
          msg = req.resp_body
400

401
402
403
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
        host_result = RpcResult(data=msg, failed=True, node=name,
                                call=procedure)
404

405
      results[name] = host_result
406

407
    return results
Iustin Pop's avatar
Iustin Pop committed
408

409
410
  def __call__(self, hosts, procedure, body, read_timeout=None,
               _req_process_fn=http.client.ProcessRequests):
411
    """Makes an RPC request to a number of nodes.
412

413
414
415
416
417
418
419
420
    @type hosts: sequence
    @param hosts: Hostnames
    @type procedure: string
    @param procedure: Request path
    @type body: string
    @param body: Request body
    @type read_timeout: int or None
    @param read_timeout: Read timeout for request
Iustin Pop's avatar
Iustin Pop committed
421
422

    """
423
    if read_timeout is None:
424
425
426
427
      read_timeout = _TIMEOUTS.get(procedure, None)

    assert read_timeout is not None, \
      "Missing RPC read timeout for procedure '%s'" % procedure
Iustin Pop's avatar
Iustin Pop committed
428

429
430
431
    (results, requests) = \
      self._PrepareRequests(self._resolver(hosts), self._port, procedure,
                            str(body), read_timeout)
Iustin Pop's avatar
Iustin Pop committed
432

433
    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
Iustin Pop's avatar
Iustin Pop committed
434

435
    assert not frozenset(results).intersection(requests)
436

437
    return self._CombineResults(results, requests, procedure)
Iustin Pop's avatar
Iustin Pop committed
438
439


440
class RpcRunner(_generated_rpc.RpcClientDefault,
441
442
                _generated_rpc.RpcClientBootstrap,
                _generated_rpc.RpcClientConfig):
443
  """RPC runner class.
Iustin Pop's avatar
Iustin Pop committed
444

445
446
447
  """
  def __init__(self, context):
    """Initialized the RPC runner.
Iustin Pop's avatar
Iustin Pop committed
448

449
450
    @type context: C{masterd.GanetiContext}
    @param context: Ganeti context
Iustin Pop's avatar
Iustin Pop committed
451

Iustin Pop's avatar
Iustin Pop committed
452
    """
453
454
455
456
    # Pylint doesn't recognize multiple inheritance properly, see
    # <http://www.logilab.org/ticket/36586> and
    # <http://www.logilab.org/ticket/35642>
    # pylint: disable=W0233
457
    _generated_rpc.RpcClientConfig.__init__(self)
458
    _generated_rpc.RpcClientBootstrap.__init__(self)
459
460
    _generated_rpc.RpcClientDefault.__init__(self)

461
    self._cfg = context.cfg
462
463
464
    self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver,
                                              self._cfg.GetNodeInfo,
                                              self._cfg.GetAllNodesInfo),
465
466
                               netutils.GetDaemonPort(constants.NODED),
                               lock_monitor_cb=context.glm.AddToLockMonitor)
Iustin Pop's avatar
Iustin Pop committed
467

468
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
469
470
471
472
473
474
475
    """Convert the given instance to a dict.

    This is done via the instance's ToDict() method and additionally
    we fill the hvparams with the cluster defaults.

    @type instance: L{objects.Instance}
    @param instance: an Instance object
476
    @type hvp: dict or None
Michael Hanselmann's avatar
Michael Hanselmann committed
477
    @param hvp: a dictionary with overridden hypervisor parameters
478
    @type bep: dict or None
Michael Hanselmann's avatar
Michael Hanselmann committed
479
    @param bep: a dictionary with overridden backend parameters
480
    @type osp: dict or None
481
    @param osp: a dictionary with overridden os parameters
482
483
484
485
486
487
    @rtype: dict
    @return: the instance dict, with the hvparams filled with the
        cluster defaults

    """
    idict = instance.ToDict()
488
489
    cluster = self._cfg.GetClusterInfo()
    idict["hvparams"] = cluster.FillHV(instance)
490
491
    if hvp is not None:
      idict["hvparams"].update(hvp)
492
    idict["beparams"] = cluster.FillBE(instance)
493
494
    if bep is not None:
      idict["beparams"].update(bep)
495
496
497
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
    if osp is not None:
      idict["osparams"].update(osp)
498
499
500
501
    for nic in idict["nics"]:
      nic['nicparams'] = objects.FillDict(
        cluster.nicparams[constants.PP_DEFAULT],
        nic['nicparams'])
502
503
    return idict

504
505
506
507
508
509
510
511
512
513
514
515
  def _InstDictHvpBep(self, (instance, hvp, bep)):
    """Wrapper for L{_InstDict}.

    """
    return self._InstDict(instance, hvp=hvp, bep=bep)

  def _InstDictOsp(self, (instance, osparams)):
    """Wrapper for L{_InstDict}.

    """
    return self._InstDict(instance, osp=osparams)

516
517
518
  def _Call(self, node_list, procedure, timeout, args):
    """Entry point for automatically generated RPC wrappers.

519
    """
520
    body = serializer.DumpJson(args, indent=False)
521

522
    return self._proc(node_list, procedure, body, read_timeout=timeout)
523

Michael Hanselmann's avatar
Michael Hanselmann committed
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
  @staticmethod
  def _BlockdevFindPostProc(result):
    if not result.fail_msg and result.payload is not None:
      result.payload = objects.BlockDevStatus.FromDict(result.payload)
    return result

  @staticmethod
  def _BlockdevGetMirrorStatusPostProc(result):
    if not result.fail_msg:
      result.payload = [objects.BlockDevStatus.FromDict(i)
                        for i in result.payload]
    return result

  @staticmethod
  def _BlockdevGetMirrorStatusMultiPostProc(result):
    for nres in result.values():
      if nres.fail_msg:
        continue

      for idx, (success, status) in enumerate(nres.payload):
        if success:
          nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))

    return result

  @staticmethod
  def _OsGetPostProc(result):
    if not result.fail_msg and isinstance(result.payload, dict):
      result.payload = objects.OS.FromDict(result.payload)
    return result

  @staticmethod
  def _PrepareFinalizeExportDisks(snap_disks):
    flat_disks = []

    for disk in snap_disks:
      if isinstance(disk, bool):
        flat_disks.append(disk)
      else:
        flat_disks.append(disk.ToDict())

    return flat_disks

  @staticmethod
  def _ImpExpStatusPostProc(result):
    """Post-processor for import/export status.

    @rtype: Payload containing list of L{objects.ImportExportStatus} instances
    @return: Returns a list of the state of each named import/export or None if
             a status couldn't be retrieved

    """
    if not result.fail_msg:
      decoded = []

      for i in result.payload:
        if i is None:
          decoded.append(None)
          continue
        decoded.append(objects.ImportExportStatus.FromDict(i))

      result.payload = decoded

    return result

589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
  @staticmethod
  def _EncodeImportExportIO(ieio, ieioargs):
    """Encodes import/export I/O information.

    """
    if ieio == constants.IEIO_RAW_DISK:
      assert len(ieioargs) == 1
      return (ieioargs[0].ToDict(), )

    if ieio == constants.IEIO_SCRIPT:
      assert len(ieioargs) == 2
      return (ieioargs[0].ToDict(), ieioargs[1])

    return ieioargs

604
605
606
  @staticmethod
  def _PrepareFileUpload(filename):
    """Loads a file and prepares it for an upload to nodes.
607

Iustin Pop's avatar
Iustin Pop committed
608
    """
609
610
    data = _Compress(utils.ReadFile(filename))
    st = os.stat(filename)
611
    getents = runtime.GetEnts()
612
613
    return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
            getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
614

615
616
617
  #
  # Begin RPC calls
  #
618

Michael Hanselmann's avatar
Michael Hanselmann committed
619
  def call_test_delay(self, node_list, duration, read_timeout=None):
Iustin Pop's avatar
Iustin Pop committed
620
    """Sleep for a fixed time on given node(s).
621

Iustin Pop's avatar
Iustin Pop committed
622
    This is a multi-node call.
623

Iustin Pop's avatar
Iustin Pop committed
624
    """
Michael Hanselmann's avatar
Michael Hanselmann committed
625
626
627
    assert read_timeout is None
    return self.call_test_delay(node_list, duration,
                                read_timeout=int(duration + 5))
628

629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658

class JobQueueRunner(_generated_rpc.RpcClientJobQueue):
  """RPC wrappers for job queue.

  """
  _Compress = staticmethod(_Compress)

  def __init__(self, context, address_list):
    """Initializes this class.

    """
    _generated_rpc.RpcClientJobQueue.__init__(self)

    if address_list is None:
      resolver = _SsconfResolver
    else:
      # Caller provided an address list
      resolver = _StaticResolver(address_list)

    self._proc = _RpcProcessor(resolver,
                               netutils.GetDaemonPort(constants.NODED),
                               lock_monitor_cb=context.glm.AddToLockMonitor)

  def _Call(self, node_list, procedure, timeout, args):
    """Entry point for automatically generated RPC wrappers.

    """
    body = serializer.DumpJson(args, indent=False)

    return self._proc(node_list, procedure, body, read_timeout=timeout)
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680


class BootstrapRunner(_generated_rpc.RpcClientBootstrap):
  """RPC wrappers for bootstrapping.

  """
  def __init__(self):
    """Initializes this class.

    """
    _generated_rpc.RpcClientBootstrap.__init__(self)

    self._proc = _RpcProcessor(_SsconfResolver,
                               netutils.GetDaemonPort(constants.NODED))

  def _Call(self, node_list, procedure, timeout, args):
    """Entry point for automatically generated RPC wrappers.

    """
    body = serializer.DumpJson(args, indent=False)

    return self._proc(node_list, procedure, body, read_timeout=timeout)
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711


class ConfigRunner(_generated_rpc.RpcClientConfig):
  """RPC wrappers for L{config}.

  """
  _PrepareFileUpload = \
    staticmethod(RpcRunner._PrepareFileUpload) # pylint: disable=W0212

  def __init__(self, address_list):
    """Initializes this class.

    """
    _generated_rpc.RpcClientConfig.__init__(self)

    if address_list is None:
      resolver = _SsconfResolver
    else:
      # Caller provided an address list
      resolver = _StaticResolver(address_list)

    self._proc = _RpcProcessor(resolver,
                               netutils.GetDaemonPort(constants.NODED))

  def _Call(self, node_list, procedure, timeout, args):
    """Entry point for automatically generated RPC wrappers.

    """
    body = serializer.DumpJson(args, indent=False)

    return self._proc(node_list, procedure, body, read_timeout=timeout)