Revision cd40dc53 lib/rpc.py

b/lib/rpc.py
47 47
from ganeti import ssconf
48 48
from ganeti import runtime
49 49
from ganeti import compat
50
from ganeti import rpc_defs
50 51

  
51 52
# Special module generated at build time
52 53
from ganeti import _generated_rpc
......
409 410
    return self._CombineResults(results, requests, procedure)
410 411

  
411 412

  
412
class RpcRunner(_generated_rpc.RpcClientDefault,
413
class _RpcClientBase:
414
  def __init__(self, resolver, encoder_fn, lock_monitor_cb=None):
415
    """Initializes this class.
416

  
417
    """
418
    self._proc = _RpcProcessor(resolver,
419
                               netutils.GetDaemonPort(constants.NODED),
420
                               lock_monitor_cb=lock_monitor_cb)
421
    self._encoder = compat.partial(self._EncodeArg, encoder_fn)
422

  
423
  @staticmethod
424
  def _EncodeArg(encoder_fn, (argkind, value)):
425
    """Encode argument.
426

  
427
    """
428
    if argkind is None:
429
      return value
430
    else:
431
      return encoder_fn(argkind)(value)
432

  
433
  def _Call(self, node_list, procedure, timeout, argdefs, args):
434
    """Entry point for automatically generated RPC wrappers.
435

  
436
    """
437
    assert len(args) == len(argdefs), "Wrong number of arguments"
438

  
439
    body = serializer.DumpJson(map(self._encoder, zip(argdefs, args)),
440
                               indent=False)
441

  
442
    return self._proc(node_list, procedure, body, read_timeout=timeout)
443

  
444

  
445
def _ObjectToDict(value):
446
  """Converts an object to a dictionary.
447

  
448
  @note: See L{objects}.
449

  
450
  """
451
  return value.ToDict()
452

  
453

  
454
def _ObjectListToDict(value):
455
  """Converts a list of L{objects} to dictionaries.
456

  
457
  """
458
  return map(_ObjectToDict, value)
459

  
460

  
461
def _EncodeNodeToDiskDict(value):
462
  """Encodes a dictionary with node name as key and disk objects as values.
463

  
464
  """
465
  return dict((name, _ObjectListToDict(disks))
466
              for name, disks in value.items())
467

  
468

  
469
def _PrepareFileUpload(filename):
470
  """Loads a file and prepares it for an upload to nodes.
471

  
472
  """
473
  data = _Compress(utils.ReadFile(filename))
474
  st = os.stat(filename)
475
  getents = runtime.GetEnts()
476
  return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
477
          getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
478

  
479

  
480
def _PrepareFinalizeExportDisks(snap_disks):
481
  """Encodes disks for finalizing export.
482

  
483
  """
484
  flat_disks = []
485

  
486
  for disk in snap_disks:
487
    if isinstance(disk, bool):
488
      flat_disks.append(disk)
489
    else:
490
      flat_disks.append(disk.ToDict())
491

  
492
  return flat_disks
493

  
494

  
495
def _EncodeImportExportIO((ieio, ieioargs)):
496
  """Encodes import/export I/O information.
497

  
498
  """
499
  if ieio == constants.IEIO_RAW_DISK:
500
    assert len(ieioargs) == 1
501
    return (ieio, (ieioargs[0].ToDict(), ))
502

  
503
  if ieio == constants.IEIO_SCRIPT:
504
    assert len(ieioargs) == 2
505
    return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
506

  
507
  return (ieio, ieioargs)
508

  
509

  
510
def _EncodeBlockdevRename(value):
511
  """Encodes information for renaming block devices.
512

  
513
  """
514
  return [(d.ToDict(), uid) for d, uid in value]
515

  
516

  
517
#: Generic encoders
518
_ENCODERS = {
519
  rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
520
  rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
521
  rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
522
  rpc_defs.ED_FILE_DETAILS: _PrepareFileUpload,
523
  rpc_defs.ED_COMPRESS: _Compress,
524
  rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
525
  rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
526
  rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
527
  }
528

  
529

  
530
class RpcRunner(_RpcClientBase,
531
                _generated_rpc.RpcClientDefault,
413 532
                _generated_rpc.RpcClientBootstrap,
414 533
                _generated_rpc.RpcClientConfig):
415 534
  """RPC runner class.
......
422 541
    @param context: Ganeti context
423 542

  
424 543
    """
544
    self._cfg = context.cfg
545

  
546
    encoders = _ENCODERS.copy()
547

  
548
    # Add encoders requiring configuration object
549
    encoders.update({
550
      rpc_defs.ED_INST_DICT: self._InstDict,
551
      rpc_defs.ED_INST_DICT_HVP_BEP: self._InstDictHvpBep,
552
      rpc_defs.ED_INST_DICT_OSP: self._InstDictOsp,
553
      })
554

  
555
    # Resolver using configuration
556
    resolver = compat.partial(_NodeConfigResolver, self._cfg.GetNodeInfo,
557
                              self._cfg.GetAllNodesInfo)
558

  
425 559
    # Pylint doesn't recognize multiple inheritance properly, see
426 560
    # <http://www.logilab.org/ticket/36586> and
427 561
    # <http://www.logilab.org/ticket/35642>
428 562
    # pylint: disable=W0233
563
    _RpcClientBase.__init__(self, resolver, encoders.get,
564
                            lock_monitor_cb=context.glm.AddToLockMonitor)
429 565
    _generated_rpc.RpcClientConfig.__init__(self)
430 566
    _generated_rpc.RpcClientBootstrap.__init__(self)
431 567
    _generated_rpc.RpcClientDefault.__init__(self)
432 568

  
433
    self._cfg = context.cfg
434
    self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver,
435
                                              self._cfg.GetNodeInfo,
436
                                              self._cfg.GetAllNodesInfo),
437
                               netutils.GetDaemonPort(constants.NODED),
438
                               lock_monitor_cb=context.glm.AddToLockMonitor)
439

  
440 569
  def _InstDict(self, instance, hvp=None, bep=None, osp=None):
441 570
    """Convert the given instance to a dict.
442 571

  
......
485 614
    """
486 615
    return self._InstDict(instance, osp=osparams)
487 616

  
488
  def _Call(self, node_list, procedure, timeout, args):
489
    """Entry point for automatically generated RPC wrappers.
490

  
491
    """
492
    body = serializer.DumpJson(args, indent=False)
493

  
494
    return self._proc(node_list, procedure, body, read_timeout=timeout)
495

  
496 617
  @staticmethod
497 618
  def _MigrationStatusPostProc(result):
498 619
    if not result.fail_msg and result.payload is not None:
......
531 652
    return result
532 653

  
533 654
  @staticmethod
534
  def _PrepareFinalizeExportDisks(snap_disks):
535
    flat_disks = []
536

  
537
    for disk in snap_disks:
538
      if isinstance(disk, bool):
539
        flat_disks.append(disk)
540
      else:
541
        flat_disks.append(disk.ToDict())
542

  
543
    return flat_disks
544

  
545
  @staticmethod
546 655
  def _ImpExpStatusPostProc(result):
547 656
    """Post-processor for import/export status.
548 657

  
......
564 673

  
565 674
    return result
566 675

  
567
  @staticmethod
568
  def _EncodeImportExportIO((ieio, ieioargs)):
569
    """Encodes import/export I/O information.
570

  
571
    """
572
    if ieio == constants.IEIO_RAW_DISK:
573
      assert len(ieioargs) == 1
574
      return (ieio, (ieioargs[0].ToDict(), ))
575

  
576
    if ieio == constants.IEIO_SCRIPT:
577
      assert len(ieioargs) == 2
578
      return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
579

  
580
    return (ieio, ieioargs)
581

  
582
  @staticmethod
583
  def _PrepareFileUpload(filename):
584
    """Loads a file and prepares it for an upload to nodes.
585

  
586
    """
587
    data = _Compress(utils.ReadFile(filename))
588
    st = os.stat(filename)
589
    getents = runtime.GetEnts()
590
    return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
591
            getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
592

  
593 676
  #
594 677
  # Begin RPC calls
595 678
  #
......
605 688
                                read_timeout=int(duration + 5))
606 689

  
607 690

  
608
class JobQueueRunner(_generated_rpc.RpcClientJobQueue):
691
class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
609 692
  """RPC wrappers for job queue.
610 693

  
611 694
  """
612
  _Compress = staticmethod(_Compress)
613

  
614 695
  def __init__(self, context, address_list):
615 696
    """Initializes this class.
616 697

  
617 698
    """
618
    _generated_rpc.RpcClientJobQueue.__init__(self)
619

  
620 699
    if address_list is None:
621 700
      resolver = _SsconfResolver
622 701
    else:
623 702
      # Caller provided an address list
624 703
      resolver = _StaticResolver(address_list)
625 704

  
626
    self._proc = _RpcProcessor(resolver,
627
                               netutils.GetDaemonPort(constants.NODED),
628
                               lock_monitor_cb=context.glm.AddToLockMonitor)
629

  
630
  def _Call(self, node_list, procedure, timeout, args):
631
    """Entry point for automatically generated RPC wrappers.
632

  
633
    """
634
    body = serializer.DumpJson(args, indent=False)
635

  
636
    return self._proc(node_list, procedure, body, read_timeout=timeout)
705
    _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
706
                            lock_monitor_cb=context.glm.AddToLockMonitor)
707
    _generated_rpc.RpcClientJobQueue.__init__(self)
637 708

  
638 709

  
639
class BootstrapRunner(_generated_rpc.RpcClientBootstrap):
710
class BootstrapRunner(_RpcClientBase, _generated_rpc.RpcClientBootstrap):
640 711
  """RPC wrappers for bootstrapping.
641 712

  
642 713
  """
......
644 715
    """Initializes this class.
645 716

  
646 717
    """
718
    _RpcClientBase.__init__(self, _SsconfResolver, _ENCODERS.get)
647 719
    _generated_rpc.RpcClientBootstrap.__init__(self)
648 720

  
649
    self._proc = _RpcProcessor(_SsconfResolver,
650
                               netutils.GetDaemonPort(constants.NODED))
651

  
652
  def _Call(self, node_list, procedure, timeout, args):
653
    """Entry point for automatically generated RPC wrappers.
654

  
655
    """
656
    body = serializer.DumpJson(args, indent=False)
657

  
658
    return self._proc(node_list, procedure, body, read_timeout=timeout)
659

  
660 721

  
661
class ConfigRunner(_generated_rpc.RpcClientConfig):
722
class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
662 723
  """RPC wrappers for L{config}.
663 724

  
664 725
  """
665
  _PrepareFileUpload = \
666
    staticmethod(RpcRunner._PrepareFileUpload) # pylint: disable=W0212
667

  
668 726
  def __init__(self, address_list):
669 727
    """Initializes this class.
670 728

  
671 729
    """
672
    _generated_rpc.RpcClientConfig.__init__(self)
673

  
674 730
    if address_list is None:
675 731
      resolver = _SsconfResolver
676 732
    else:
677 733
      # Caller provided an address list
678 734
      resolver = _StaticResolver(address_list)
679 735

  
680
    self._proc = _RpcProcessor(resolver,
681
                               netutils.GetDaemonPort(constants.NODED))
682

  
683
  def _Call(self, node_list, procedure, timeout, args):
684
    """Entry point for automatically generated RPC wrappers.
685

  
686
    """
687
    body = serializer.DumpJson(args, indent=False)
688

  
689
    return self._proc(node_list, procedure, body, read_timeout=timeout)
736
    _RpcClientBase.__init__(self, resolver, _ENCODERS.get)
737
    _generated_rpc.RpcClientConfig.__init__(self)

Also available in: Unified diff