rpc.py 25.9 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
#
Iustin Pop's avatar
Iustin Pop committed
2
3
#

Dimitris Aragiorgis's avatar
Dimitris Aragiorgis committed
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22
"""Inter-node RPC library.
Iustin Pop's avatar
Iustin Pop committed
23
24
25

"""

26
# pylint: disable=C0103,R0201,R0904
Iustin Pop's avatar
Iustin Pop committed
27
28
29
30
31
# C0103: Invalid name, since call_ are not valid
# R0201: Method could be a function, we keep all rpcs instance methods
# as not to change them back and forth between static/instance methods
# if they need to start using instance attributes
# R0904: Too many public methods
Iustin Pop's avatar
Iustin Pop committed
32

33
import logging
34
35
import zlib
import base64
36
37
import pycurl
import threading
Dimitris Aragiorgis's avatar
Dimitris Aragiorgis committed
38
import copy
Iustin Pop's avatar
Iustin Pop committed
39
40
41

from ganeti import utils
from ganeti import objects
42
from ganeti import http
43
from ganeti import serializer
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import netutils
47
from ganeti import ssconf
48
from ganeti import runtime
49
from ganeti import compat
50
from ganeti import rpc_defs
51
from ganeti import pathutils
52
from ganeti import vcluster
Iustin Pop's avatar
Iustin Pop committed
53

54
55
56
# Special module generated at build time
from ganeti import _generated_rpc

Iustin Pop's avatar
Iustin Pop committed
57
# pylint has a bug here, doesn't see this import
58
import ganeti.http.client  # pylint: disable=W0611
59

Iustin Pop's avatar
Iustin Pop committed
60

61
62
_RPC_CLIENT_HEADERS = [
  "Content-type: %s" % http.HTTP_APP_JSON,
Iustin Pop's avatar
Iustin Pop committed
63
  "Expect:",
64
  ]
65

66
67
68
#: Special value to describe an offline host
_OFFLINE = object()

69
70
71
72

def Init():
  """Initializes the module-global HTTP client manager.

73
74
  Must be called before using any RPC function and while exactly one thread is
  running.
75
76

  """
77
78
79
80
81
  # curl_global_init(3) and curl_global_cleanup(3) must be called with only
  # one thread running. This check is just a safety measure -- it doesn't
  # cover all cases.
  assert threading.activeCount() == 1, \
         "Found more than one active thread when initializing pycURL"
82

83
  logging.info("Using PycURL %s", pycurl.version)
84

85
  pycurl.global_init(pycurl.GLOBAL_ALL)
86
87
88
89
90


def Shutdown():
  """Stops the module-global HTTP client manager.

91
92
  Must be called before quitting the program and while exactly one thread is
  running.
93
94

  """
95
96
97
98
  pycurl.global_cleanup()


def _ConfigRpcCurl(curl):
99
  noded_cert = str(pathutils.NODED_CERT_FILE)
100

101
102
103
104
105
106
107
108
  curl.setopt(pycurl.FOLLOWLOCATION, False)
  curl.setopt(pycurl.CAINFO, noded_cert)
  curl.setopt(pycurl.SSL_VERIFYHOST, 0)
  curl.setopt(pycurl.SSL_VERIFYPEER, True)
  curl.setopt(pycurl.SSLCERTTYPE, "PEM")
  curl.setopt(pycurl.SSLCERT, noded_cert)
  curl.setopt(pycurl.SSLKEYTYPE, "PEM")
  curl.setopt(pycurl.SSLKEY, noded_cert)
109
  curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
110
111


112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def RunWithRPC(fn):
  """RPC-wrapper decorator.

  When applied to a function, it runs it with the RPC system
  initialized, and it shutsdown the system afterwards. This means the
  function must be called without RPC being initialized.

  """
  def wrapper(*args, **kwargs):
    Init()
    try:
      return fn(*args, **kwargs)
    finally:
      Shutdown()
  return wrapper


129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def _Compress(data):
  """Compresses a string for transport over RPC.

  Small amounts of data are not compressed.

  @type data: str
  @param data: Data
  @rtype: tuple
  @return: Encoded data to send

  """
  # Small amounts of data are not compressed
  if len(data) < 512:
    return (constants.RPC_ENCODING_NONE, data)

  # Compress with zlib and encode in base64
  return (constants.RPC_ENCODING_ZLIB_BASE64,
          base64.b64encode(zlib.compress(data, 3)))


149
150
151
152
class RpcResult(object):
  """RPC Result class.

  This class holds an RPC result. It is needed since in multi-node
153
  calls we can't raise an exception just because one out of many
154
155
  failed, and therefore we use this class to encapsulate the result.

Michael Hanselmann's avatar
Michael Hanselmann committed
156
  @ivar data: the data payload, for successful results, or None
157
158
159
160
161
162
  @ivar call: the name of the RPC call
  @ivar node: the name of the node to which we made the call
  @ivar offline: whether the operation failed because the node was
      offline, as opposed to actual failure; offline=True will always
      imply failed=True, in order to allow simpler checking if
      the user doesn't care about the exact failure mode
163
  @ivar fail_msg: the error message if the call failed
164

165
  """
166
167
168
169
170
  def __init__(self, data=None, failed=False, offline=False,
               call=None, node=None):
    self.offline = offline
    self.call = call
    self.node = node
171

172
    if offline:
173
      self.fail_msg = "Node is marked offline"
174
      self.data = self.payload = None
175
    elif failed:
176
      self.fail_msg = self._EnsureErr(data)
177
      self.data = self.payload = None
178
179
    else:
      self.data = data
180
      if not isinstance(self.data, (tuple, list)):
181
182
        self.fail_msg = ("RPC layer error: invalid result type (%s)" %
                         type(self.data))
183
        self.payload = None
184
      elif len(data) != 2:
185
186
        self.fail_msg = ("RPC layer error: invalid result length (%d), "
                         "expected 2" % len(self.data))
187
        self.payload = None
188
      elif not self.data[0]:
189
        self.fail_msg = self._EnsureErr(self.data[1])
190
        self.payload = None
191
      else:
192
        # finally success
193
        self.fail_msg = None
194
195
        self.payload = data[1]

Iustin Pop's avatar
Iustin Pop committed
196
197
198
    for attr_name in ["call", "data", "fail_msg",
                      "node", "offline", "payload"]:
      assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
199

200
201
202
203
204
205
206
  @staticmethod
  def _EnsureErr(val):
    """Helper to ensure we return a 'True' value for error."""
    if val:
      return val
    else:
      return "No error information"
207

208
  def Raise(self, msg, prereq=False, ecode=None):
209
210
211
212
213
214
    """If the result has failed, raise an OpExecError.

    This is used so that LU code doesn't have to check for each
    result, but instead can call this function.

    """
215
216
217
218
219
220
221
222
223
224
225
226
    if not self.fail_msg:
      return

    if not msg: # one could pass None for default message
      msg = ("Call '%s' to node '%s' has failed: %s" %
             (self.call, self.node, self.fail_msg))
    else:
      msg = "%s: %s" % (msg, self.fail_msg)
    if prereq:
      ec = errors.OpPrereqError
    else:
      ec = errors.OpExecError
227
    if ecode is not None:
Iustin Pop's avatar
Iustin Pop committed
228
      args = (msg, ecode)
229
230
    else:
      args = (msg, )
231
    raise ec(*args) # pylint: disable=W0142
232
233


234
def _SsconfResolver(ssconf_ips, node_list, _,
235
236
                    ssc=ssconf.SimpleStore,
                    nslookup_fn=netutils.Hostname.GetIP):
237
238
  """Return addresses for given node names.

239
240
  @type ssconf_ips: bool
  @param ssconf_ips: Use the ssconf IPs
241
242
243
244
  @type node_list: list
  @param node_list: List of node names
  @type ssc: class
  @param ssc: SimpleStore class that is used to obtain node->ip mappings
245
246
  @type nslookup_fn: callable
  @param nslookup_fn: function use to do NS lookup
247
248
  @rtype: list of tuple; (string, string)
  @return: List of tuples containing node name and IP address
249
250

  """
Manuel Franceschini's avatar
Manuel Franceschini committed
251
252
  ss = ssc()
  family = ss.GetPrimaryIPFamily()
253
254
255
256
257
258

  if ssconf_ips:
    iplist = ss.GetNodePrimaryIPList()
    ipmap = dict(entry.split() for entry in iplist)
  else:
    ipmap = {}
259
260

  result = []
261
  for node in node_list:
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
    ip = ipmap.get(node)
    if ip is None:
      ip = nslookup_fn(node, family=family)
    result.append((node, ip))

  return result


class _StaticResolver:
  def __init__(self, addresses):
    """Initializes this class.

    """
    self._addresses = addresses

277
  def __call__(self, hosts, _):
278
279
280
281
282
283
    """Returns static addresses for hosts.

    """
    assert len(hosts) == len(self._addresses)
    return zip(hosts, self._addresses)

284

285
def _CheckConfigNode(name, node, accept_offline_node):
286
  """Checks if a node is online.
287

288
289
290
291
  @type name: string
  @param name: Node name
  @type node: L{objects.Node} or None
  @param node: Node object
292

293
294
295
296
  """
  if node is None:
    # Depend on DNS for name resolution
    ip = name
297
  elif node.offline and not accept_offline_node:
298
299
300
301
    ip = _OFFLINE
  else:
    ip = node.primary_ip
  return (name, ip)
Iustin Pop's avatar
Iustin Pop committed
302
303


304
def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
305
  """Calculate node addresses using configuration.
Iustin Pop's avatar
Iustin Pop committed
306
307

  """
308
309
310
311
  accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)

  assert accept_offline_node or opts is None, "Unknown option"

312
313
314
  # Special case for single-host lookups
  if len(hosts) == 1:
    (name, ) = hosts
315
    return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
316
317
  else:
    all_nodes = all_nodes_fn()
318
319
    return [_CheckConfigNode(name, all_nodes.get(name, None),
                             accept_offline_node)
320
321
322
323
            for name in hosts]


class _RpcProcessor:
324
  def __init__(self, resolver, port, lock_monitor_cb=None):
325
326
327
328
329
330
331
    """Initializes this class.

    @param resolver: callable accepting a list of hostnames, returning a list
      of tuples containing name and IP address (IP address can be the name or
      the special value L{_OFFLINE} to mark offline machines)
    @type port: int
    @param port: TCP port
332
    @param lock_monitor_cb: Callable for registering with lock monitor
333

Iustin Pop's avatar
Iustin Pop committed
334
    """
335
336
    self._resolver = resolver
    self._port = port
337
    self._lock_monitor_cb = lock_monitor_cb
338

339
340
341
  @staticmethod
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
    """Prepares requests by sorting offline hosts into separate list.
342

343
344
345
    @type body: dict
    @param body: a dictionary with per-host body data

346
347
348
    """
    results = {}
    requests = {}
349

350
351
352
353
354
355
    assert isinstance(body, dict)
    assert len(body) == len(hosts)
    assert compat.all(isinstance(v, str) for v in body.values())
    assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
        "%s != %s" % (hosts, body.keys())

356
357
358
359
360
361
362
    for (name, ip) in hosts:
      if ip is _OFFLINE:
        # Node is marked as offline
        results[name] = RpcResult(node=name, offline=True, call=procedure)
      else:
        requests[name] = \
          http.client.HttpClientRequest(str(ip), port,
Iustin Pop's avatar
Iustin Pop committed
363
                                        http.HTTP_POST, str("/%s" % procedure),
364
                                        headers=_RPC_CLIENT_HEADERS,
365
                                        post_data=body[name],
366
                                        read_timeout=read_timeout,
367
368
                                        nicename="%s/%s" % (name, procedure),
                                        curl_config_fn=_ConfigRpcCurl)
Iustin Pop's avatar
Iustin Pop committed
369

370
371
372
373
374
    return (results, requests)

  @staticmethod
  def _CombineResults(results, requests, procedure):
    """Combines pre-computed results for offline hosts with actual call results.
375

Iustin Pop's avatar
Iustin Pop committed
376
    """
377
378
379
380
381
382
383
384
385
386
    for name, req in requests.items():
      if req.success and req.resp_status_code == http.HTTP_OK:
        host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
                                node=name, call=procedure)
      else:
        # TODO: Better error reporting
        if req.error:
          msg = req.error
        else:
          msg = req.resp_body
387

388
389
390
        logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
        host_result = RpcResult(data=msg, failed=True, node=name,
                                call=procedure)
391

392
      results[name] = host_result
393

394
    return results
Iustin Pop's avatar
Iustin Pop committed
395

396
  def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
397
               _req_process_fn=None):
398
    """Makes an RPC request to a number of nodes.
399

400
401
402
403
    @type hosts: sequence
    @param hosts: Hostnames
    @type procedure: string
    @param procedure: Request path
404
405
    @type body: dictionary
    @param body: dictionary with request bodies per host
406
407
    @type read_timeout: int or None
    @param read_timeout: Read timeout for request
408
409
    @rtype: dictionary
    @return: a dictionary mapping host names to rpc.RpcResult objects
Iustin Pop's avatar
Iustin Pop committed
410
411

    """
412
413
    assert read_timeout is not None, \
      "Missing RPC read timeout for procedure '%s'" % procedure
Iustin Pop's avatar
Iustin Pop committed
414

415
416
417
    if _req_process_fn is None:
      _req_process_fn = http.client.ProcessRequests

418
    (results, requests) = \
419
420
      self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
                            procedure, body, read_timeout)
Iustin Pop's avatar
Iustin Pop committed
421

422
    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
Iustin Pop's avatar
Iustin Pop committed
423

424
    assert not frozenset(results).intersection(requests)
425

426
    return self._CombineResults(results, requests, procedure)
Iustin Pop's avatar
Iustin Pop committed
427
428


429
class _RpcClientBase:
430
431
  def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
               _req_process_fn=None):
432
433
434
    """Initializes this class.

    """
435
436
437
438
    proc = _RpcProcessor(resolver,
                         netutils.GetDaemonPort(constants.NODED),
                         lock_monitor_cb=lock_monitor_cb)
    self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
439
440
441
442
443
444
445
446
447
448
449
450
    self._encoder = compat.partial(self._EncodeArg, encoder_fn)

  @staticmethod
  def _EncodeArg(encoder_fn, (argkind, value)):
    """Encode argument.

    """
    if argkind is None:
      return value
    else:
      return encoder_fn(argkind)(value)

451
  def _Call(self, cdef, node_list, args):
452
453
454
    """Entry point for automatically generated RPC wrappers.

    """
455
456
    (procedure, _, resolver_opts, timeout, argdefs,
     prep_fn, postproc_fn, _) = cdef
457
458
459
460
461

    if callable(timeout):
      read_timeout = timeout(args)
    else:
      read_timeout = timeout
462

463
464
465
466
467
    if callable(resolver_opts):
      req_resolver_opts = resolver_opts(args)
    else:
      req_resolver_opts = resolver_opts

468
469
470
    if len(args) != len(argdefs):
      raise errors.ProgrammerError("Number of passed arguments doesn't match")

471
472
473
474
475
476
477
478
479
    enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
    if prep_fn is None:
      # for a no-op prep_fn, we serialise the body once, and then we
      # reuse it in the dictionary values
      body = serializer.DumpJson(enc_args)
      pnbody = dict((n, body) for n in node_list)
    else:
      # for a custom prep_fn, we pass the encoded arguments and the
      # node name to the prep_fn, and we serialise its return value
Michael Hanselmann's avatar
Michael Hanselmann committed
480
      assert callable(prep_fn)
481
482
483
      pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
                    for n in node_list)

484
485
    result = self._proc(node_list, procedure, pnbody, read_timeout,
                        req_resolver_opts)
486
487

    if postproc_fn:
488
489
      return dict(map(lambda (key, value): (key, postproc_fn(value)),
                      result.items()))
490
491
    else:
      return result
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517


def _ObjectToDict(value):
  """Converts an object to a dictionary.

  @note: See L{objects}.

  """
  return value.ToDict()


def _ObjectListToDict(value):
  """Converts a list of L{objects} to dictionaries.

  """
  return map(_ObjectToDict, value)


def _EncodeNodeToDiskDict(value):
  """Encodes a dictionary with node name as key and disk objects as values.

  """
  return dict((name, _ObjectListToDict(disks))
              for name, disks in value.items())


518
def _PrepareFileUpload(getents_fn, filename):
519
520
521
  """Loads a file and prepares it for an upload to nodes.

  """
522
523
524
  statcb = utils.FileStatHelper()
  data = _Compress(utils.ReadFile(filename, preread=statcb))
  st = statcb.st
525
526
527
528
529
530

  if getents_fn is None:
    getents_fn = runtime.GetEnts

  getents = getents_fn()

531
532
533
  virt_filename = vcluster.MakeVirtualPath(filename)

  return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid),
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
          getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]


def _PrepareFinalizeExportDisks(snap_disks):
  """Encodes disks for finalizing export.

  """
  flat_disks = []

  for disk in snap_disks:
    if isinstance(disk, bool):
      flat_disks.append(disk)
    else:
      flat_disks.append(disk.ToDict())

  return flat_disks


def _EncodeImportExportIO((ieio, ieioargs)):
  """Encodes import/export I/O information.

  """
  if ieio == constants.IEIO_RAW_DISK:
    assert len(ieioargs) == 1
    return (ieio, (ieioargs[0].ToDict(), ))

  if ieio == constants.IEIO_SCRIPT:
    assert len(ieioargs) == 2
    return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))

  return (ieio, ieioargs)


def _EncodeBlockdevRename(value):
  """Encodes information for renaming block devices.

  """
  return [(d.ToDict(), uid) for d, uid in value]


574
575
576
577
578
579
580
581
582
583
584
585
586
587
def MakeLegacyNodeInfo(data):
  """Formats the data returned by L{rpc.RpcRunner.call_node_info}.

  Converts the data into a single dictionary. This is fine for most use cases,
  but some require information from more than one volume group or hypervisor.

  """
  (bootid, (vg_info, ), (hv_info, )) = data

  return utils.JoinDisjointDicts(utils.JoinDisjointDicts(vg_info, hv_info), {
    "bootid": bootid,
    })


588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
  """Annotates just DRBD disks layouts.

  """
  assert disk.dev_type == constants.LD_DRBD8

  disk.params = objects.FillDict(drbd_params, disk.params)
  (dev_data, dev_meta) = disk.children
  dev_data.params = objects.FillDict(data_params, dev_data.params)
  dev_meta.params = objects.FillDict(meta_params, dev_meta.params)

  return disk


def _AnnotateDParamsGeneric(disk, (params, )):
  """Generic disk parameter annotation routine.

  """
  assert disk.dev_type != constants.LD_DRBD8

  disk.params = objects.FillDict(params, disk.params)

  return disk


def AnnotateDiskParams(template, disks, disk_params):
  """Annotates the disk objects with the disk parameters.

  @param template: The disk template used
  @param disks: The list of disks objects to annotate
  @param disk_params: The disk paramaters for annotation
  @returns: A list of disk objects annotated

  """
  ld_params = objects.Disk.ComputeLDParams(template, disk_params)

  if template == constants.DT_DRBD8:
    annotation_fn = _AnnotateDParamsDRBD
  elif template == constants.DT_DISKLESS:
    annotation_fn = lambda disk, _: disk
  else:
    annotation_fn = _AnnotateDParamsGeneric

631
  return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
632
633


634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
def _GetESFlag(cfg, nodename):
  ni = cfg.GetNodeInfo(nodename)
  if ni is None:
    raise errors.OpPrereqError("Invalid node name %s" % nodename,
                               errors.ECODE_NOENT)
  return cfg.GetNdParams(ni)[constants.ND_EXCLUSIVE_STORAGE]


def GetExclusiveStorageForNodeNames(cfg, nodelist):
  """Return the exclusive storage flag for all the given nodes.

  @type cfg: L{config.ConfigWriter}
  @param cfg: cluster configuration
  @type nodelist: list or tuple
  @param nodelist: node names for which to read the flag
  @rtype: dict
  @return: mapping from node names to exclusive storage flags
  @raise errors.OpPrereqError: if any given node name has no corresponding node

  """
  getflag = lambda n: _GetESFlag(cfg, n)
  flags = map(getflag, nodelist)
  return dict(zip(nodelist, flags))


659
660
661
662
663
664
665
666
667
668
669
670
671
672
#: Generic encoders
_ENCODERS = {
  rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
  rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
  rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
  rpc_defs.ED_COMPRESS: _Compress,
  rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
  rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
  rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
  }


class RpcRunner(_RpcClientBase,
                _generated_rpc.RpcClientDefault,
673
                _generated_rpc.RpcClientBootstrap,
674
                _generated_rpc.RpcClientDnsOnly,
675
                _generated_rpc.RpcClientConfig):
676
  """RPC runner class.
Iustin Pop's avatar
Iustin Pop committed
677

678
  """
679
  def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
680
    """Initialized the RPC runner.
Iustin Pop's avatar
Iustin Pop committed
681

682
683
684
685
    @type cfg: L{config.ConfigWriter}
    @param cfg: Configuration
    @type lock_monitor_cb: callable
    @param lock_monitor_cb: Lock monitor callback
Iustin Pop's avatar
Iustin Pop committed
686

Iustin Pop's avatar
Iustin Pop committed
687
    """
688
    self._cfg = cfg
689
690
691
692

    encoders = _ENCODERS.copy()

    encoders.update({
693
      # Encoders requiring configuration object
694
      rpc_defs.ED_INST_DICT: self._InstDict,
695
      rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
696
      rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
Dimitris Aragiorgis's avatar
Dimitris Aragiorgis committed
697
      rpc_defs.ED_NIC_DICT: self._NicDict,
698

699
700
701
702
      # Encoders annotating disk parameters
      rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
      rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,

703
704
      # Encoders with special requirements
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
705
706
707
      })

    # Resolver using configuration
708
709
    resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
                              cfg.GetAllNodesInfo)
710

711
712
713
714
    # Pylint doesn't recognize multiple inheritance properly, see
    # <http://www.logilab.org/ticket/36586> and
    # <http://www.logilab.org/ticket/35642>
    # pylint: disable=W0233
715
    _RpcClientBase.__init__(self, resolver, encoders.get,
716
717
                            lock_monitor_cb=lock_monitor_cb,
                            _req_process_fn=_req_process_fn)
718
    _generated_rpc.RpcClientConfig.__init__(self)
719
    _generated_rpc.RpcClientBootstrap.__init__(self)
720
    _generated_rpc.RpcClientDnsOnly.__init__(self)
721
722
    _generated_rpc.RpcClientDefault.__init__(self)

Dimitris Aragiorgis's avatar
Dimitris Aragiorgis committed
723
724
725
726
727
728
729
730
731
732
733
734
  def _NicDict(self, nic):
    """Convert the given nic to a dict and encapsulate netinfo

    """
    n = copy.deepcopy(nic)
    if n.network:
      net_uuid = self._cfg.LookupNetwork(n.network)
      if net_uuid:
        nobj = self._cfg.GetNetwork(net_uuid)
        n.netinfo = objects.Network.ToDict(nobj)
    return n.ToDict()

735
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
736
737
738
739
740
741
742
    """Convert the given instance to a dict.

    This is done via the instance's ToDict() method and additionally
    we fill the hvparams with the cluster defaults.

    @type instance: L{objects.Instance}
    @param instance: an Instance object
743
    @type hvp: dict or None
Michael Hanselmann's avatar
Michael Hanselmann committed
744
    @param hvp: a dictionary with overridden hypervisor parameters
745
    @type bep: dict or None
Michael Hanselmann's avatar
Michael Hanselmann committed
746
    @param bep: a dictionary with overridden backend parameters
747
    @type osp: dict or None
748
    @param osp: a dictionary with overridden os parameters
749
750
751
752
753
754
    @rtype: dict
    @return: the instance dict, with the hvparams filled with the
        cluster defaults

    """
    idict = instance.ToDict()
755
756
    cluster = self._cfg.GetClusterInfo()
    idict["hvparams"] = cluster.FillHV(instance)
757
758
    if hvp is not None:
      idict["hvparams"].update(hvp)
759
    idict["beparams"] = cluster.FillBE(instance)
760
761
    if bep is not None:
      idict["beparams"].update(bep)
762
763
764
    idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
    if osp is not None:
      idict["osparams"].update(osp)
Dimitris Aragiorgis's avatar
Dimitris Aragiorgis committed
765
    idict["disks"] = self._DisksDictDP((instance.disks, instance))
766
    for nic in idict["nics"]:
767
      nic["nicparams"] = objects.FillDict(
768
        cluster.nicparams[constants.PP_DEFAULT],
769
        nic["nicparams"])
Dimitris Aragiorgis's avatar
Dimitris Aragiorgis committed
770
771
772
773
774
775
      network = nic.get("network", None)
      if network:
        net_uuid = self._cfg.LookupNetwork(network)
        if net_uuid:
          nobj = self._cfg.GetNetwork(net_uuid)
          nic["netinfo"] = objects.Network.ToDict(nobj)
776
777
    return idict

778
  def _InstDictHvpBepDp(self, (instance, hvp, bep)):
779
780
781
782
783
    """Wrapper for L{_InstDict}.

    """
    return self._InstDict(instance, hvp=hvp, bep=bep)

784
  def _InstDictOspDp(self, (instance, osparams)):
785
786
787
    """Wrapper for L{_InstDict}.

    """
788
    return self._InstDict(instance, osp=osparams)
789

790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
  def _DisksDictDP(self, (disks, instance)):
    """Wrapper for L{AnnotateDiskParams}.

    """
    diskparams = self._cfg.GetInstanceDiskParams(instance)
    return [disk.ToDict()
            for disk in AnnotateDiskParams(instance.disk_template,
                                           disks, diskparams)]

  def _SingleDiskDictDP(self, (disk, instance)):
    """Wrapper for L{AnnotateDiskParams}.

    """
    (anno_disk,) = self._DisksDictDP(([disk], instance))
    return anno_disk

806

807
class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
808
809
810
811
812
813
814
815
  """RPC wrappers for job queue.

  """
  def __init__(self, context, address_list):
    """Initializes this class.

    """
    if address_list is None:
816
      resolver = compat.partial(_SsconfResolver, True)
817
818
819
820
    else:
      # Caller provided an address list
      resolver = _StaticResolver(address_list)

821
822
823
    _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
                            lock_monitor_cb=context.glm.AddToLockMonitor)
    _generated_rpc.RpcClientJobQueue.__init__(self)
824
825


826
827
828
class BootstrapRunner(_RpcClientBase,
                      _generated_rpc.RpcClientBootstrap,
                      _generated_rpc.RpcClientDnsOnly):
829
830
831
832
833
834
835
  """RPC wrappers for bootstrapping.

  """
  def __init__(self):
    """Initializes this class.

    """
836
837
838
839
840
841
    # Pylint doesn't recognize multiple inheritance properly, see
    # <http://www.logilab.org/ticket/36586> and
    # <http://www.logilab.org/ticket/35642>
    # pylint: disable=W0233
    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
                            _ENCODERS.get)
842
    _generated_rpc.RpcClientBootstrap.__init__(self)
843
844
845
846
847
848
849
850
851
852
853
854
855
856
    _generated_rpc.RpcClientDnsOnly.__init__(self)


class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
  """RPC wrappers for calls using only DNS.

  """
  def __init__(self):
    """Initialize this class.

    """
    _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
                            _ENCODERS.get)
    _generated_rpc.RpcClientDnsOnly.__init__(self)
857

858

859
class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
860
861
862
  """RPC wrappers for L{config}.

  """
863
864
  def __init__(self, context, address_list, _req_process_fn=None,
               _getents=None):
865
866
867
    """Initializes this class.

    """
868
869
870
871
872
    if context:
      lock_monitor_cb = context.glm.AddToLockMonitor
    else:
      lock_monitor_cb = None

873
    if address_list is None:
874
      resolver = compat.partial(_SsconfResolver, True)
875
876
877
878
    else:
      # Caller provided an address list
      resolver = _StaticResolver(address_list)

879
880
881
882
883
884
885
886
887
    encoders = _ENCODERS.copy()

    encoders.update({
      rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
      })

    _RpcClientBase.__init__(self, resolver, encoders.get,
                            lock_monitor_cb=lock_monitor_cb,
                            _req_process_fn=_req_process_fn)
888
    _generated_rpc.RpcClientConfig.__init__(self)