Revision 7ecd5e87

b/Makefile.am
316 316
	lib/cmdlib/node.py \
317 317
	lib/cmdlib/instance.py \
318 318
	lib/cmdlib/instance_utils.py \
319
	lib/cmdlib/backup.py \
319 320
	lib/cmdlib/tags.py \
320 321
	lib/cmdlib/network.py \
321 322
	lib/cmdlib/test.py
b/lib/cmdlib/__init__.py
30 30

  
31 31
import time
32 32
import logging
33
import OpenSSL
34 33

  
35 34
from ganeti import utils
36 35
from ganeti import errors
37 36
from ganeti import locking
38 37
from ganeti import constants
39 38
from ganeti import compat
40
from ganeti import masterd
41 39
from ganeti import query
42 40
from ganeti import qlang
43 41

  
......
83 81
  LUInstanceReinstall, LUInstanceReboot, LUInstanceConsole, \
84 82
  LUInstanceFailover, LUInstanceMigrate, LUInstanceMultiAlloc, \
85 83
  LUInstanceSetParams, LUInstanceChangeGroup
84
from ganeti.cmdlib.backup import _ExportQuery, LUBackupQuery, \
85
  LUBackupPrepare, LUBackupExport, LUBackupRemove
86 86
from ganeti.cmdlib.tags import LUTagsGet, LUTagsSearch, LUTagsSet, LUTagsDel
87 87
from ganeti.cmdlib.network import LUNetworkAdd, LUNetworkRemove, \
88 88
  LUNetworkSetParams, _NetworkQuery, LUNetworkQuery, LUNetworkConnect, \
......
629 629
    return query.QueryFields(self.qcls.FIELDS, self.op.fields)
630 630

  
631 631

  
632
class LUBackupQuery(NoHooksLU):
633
  """Query the exports list
634

  
635
  """
636
  REQ_BGL = False
637

  
638
  def CheckArguments(self):
639
    self.expq = _ExportQuery(qlang.MakeSimpleFilter("node", self.op.nodes),
640
                             ["node", "export"], self.op.use_locking)
641

  
642
  def ExpandNames(self):
643
    self.expq.ExpandNames(self)
644

  
645
  def DeclareLocks(self, level):
646
    self.expq.DeclareLocks(self, level)
647

  
648
  def Exec(self, feedback_fn):
649
    result = {}
650

  
651
    for (node, expname) in self.expq.OldStyleQuery(self):
652
      if expname is None:
653
        result[node] = False
654
      else:
655
        result.setdefault(node, []).append(expname)
656

  
657
    return result
658

  
659

  
660
class _ExportQuery(_QueryBase):
661
  FIELDS = query.EXPORT_FIELDS
662

  
663
  #: The node name is not a unique key for this query
664
  SORT_FIELD = "node"
665

  
666
  def ExpandNames(self, lu):
667
    lu.needed_locks = {}
668

  
669
    # The following variables interact with _QueryBase._GetNames
670
    if self.names:
671
      self.wanted = _GetWantedNodes(lu, self.names)
672
    else:
673
      self.wanted = locking.ALL_SET
674

  
675
    self.do_locking = self.use_locking
676

  
677
    if self.do_locking:
678
      lu.share_locks = _ShareAll()
679
      lu.needed_locks = {
680
        locking.LEVEL_NODE: self.wanted,
681
        }
682

  
683
      if not self.names:
684
        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
685

  
686
  def DeclareLocks(self, lu, level):
687
    pass
688

  
689
  def _GetQueryData(self, lu):
690
    """Computes the list of nodes and their attributes.
691

  
692
    """
693
    # Locking is not used
694
    # TODO
695
    assert not (compat.any(lu.glm.is_owned(level)
696
                           for level in locking.LEVELS
697
                           if level != locking.LEVEL_CLUSTER) or
698
                self.do_locking or self.use_locking)
699

  
700
    nodes = self._GetNames(lu, lu.cfg.GetNodeList(), locking.LEVEL_NODE)
701

  
702
    result = []
703

  
704
    for (node, nres) in lu.rpc.call_export_list(nodes).items():
705
      if nres.fail_msg:
706
        result.append((node, None))
707
      else:
708
        result.extend((node, expname) for expname in nres.payload)
709

  
710
    return result
711

  
712

  
713
class LUBackupPrepare(NoHooksLU):
714
  """Prepares an instance for an export and returns useful information.
715

  
716
  """
717
  REQ_BGL = False
718

  
719
  def ExpandNames(self):
720
    self._ExpandAndLockInstance()
721

  
722
  def CheckPrereq(self):
723
    """Check prerequisites.
724

  
725
    """
726
    instance_name = self.op.instance_name
727

  
728
    self.instance = self.cfg.GetInstanceInfo(instance_name)
729
    assert self.instance is not None, \
730
          "Cannot retrieve locked instance %s" % self.op.instance_name
731
    _CheckNodeOnline(self, self.instance.primary_node)
732

  
733
    self._cds = _GetClusterDomainSecret()
734

  
735
  def Exec(self, feedback_fn):
736
    """Prepares an instance for an export.
737

  
738
    """
739
    instance = self.instance
740

  
741
    if self.op.mode == constants.EXPORT_MODE_REMOTE:
742
      salt = utils.GenerateSecret(8)
743

  
744
      feedback_fn("Generating X509 certificate on %s" % instance.primary_node)
745
      result = self.rpc.call_x509_cert_create(instance.primary_node,
746
                                              constants.RIE_CERT_VALIDITY)
747
      result.Raise("Can't create X509 key and certificate on %s" % result.node)
748

  
749
      (name, cert_pem) = result.payload
750

  
751
      cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
752
                                             cert_pem)
753

  
754
      return {
755
        "handshake": masterd.instance.ComputeRemoteExportHandshake(self._cds),
756
        "x509_key_name": (name, utils.Sha1Hmac(self._cds, name, salt=salt),
757
                          salt),
758
        "x509_ca": utils.SignX509Certificate(cert, self._cds, salt),
759
        }
760

  
761
    return None
762

  
763

  
764
class LUBackupExport(LogicalUnit):
765
  """Export an instance to an image in the cluster.
766

  
767
  """
768
  HPATH = "instance-export"
769
  HTYPE = constants.HTYPE_INSTANCE
770
  REQ_BGL = False
771

  
772
  def CheckArguments(self):
773
    """Check the arguments.
774

  
775
    """
776
    self.x509_key_name = self.op.x509_key_name
777
    self.dest_x509_ca_pem = self.op.destination_x509_ca
778

  
779
    if self.op.mode == constants.EXPORT_MODE_REMOTE:
780
      if not self.x509_key_name:
781
        raise errors.OpPrereqError("Missing X509 key name for encryption",
782
                                   errors.ECODE_INVAL)
783

  
784
      if not self.dest_x509_ca_pem:
785
        raise errors.OpPrereqError("Missing destination X509 CA",
786
                                   errors.ECODE_INVAL)
787

  
788
  def ExpandNames(self):
789
    self._ExpandAndLockInstance()
790

  
791
    # Lock all nodes for local exports
792
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
793
      # FIXME: lock only instance primary and destination node
794
      #
795
      # Sad but true, for now we have do lock all nodes, as we don't know where
796
      # the previous export might be, and in this LU we search for it and
797
      # remove it from its current node. In the future we could fix this by:
798
      #  - making a tasklet to search (share-lock all), then create the
799
      #    new one, then one to remove, after
800
      #  - removing the removal operation altogether
801
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
802

  
803
      # Allocations should be stopped while this LU runs with node locks, but
804
      # it doesn't have to be exclusive
805
      self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
806
      self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
807

  
808
  def DeclareLocks(self, level):
809
    """Last minute lock declaration."""
810
    # All nodes are locked anyway, so nothing to do here.
811

  
812
  def BuildHooksEnv(self):
813
    """Build hooks env.
814

  
815
    This will run on the master, primary node and target node.
816

  
817
    """
818
    env = {
819
      "EXPORT_MODE": self.op.mode,
820
      "EXPORT_NODE": self.op.target_node,
821
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
822
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
823
      # TODO: Generic function for boolean env variables
824
      "REMOVE_INSTANCE": str(bool(self.op.remove_instance)),
825
      }
826

  
827
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
828

  
829
    return env
830

  
831
  def BuildHooksNodes(self):
832
    """Build hooks nodes.
833

  
834
    """
835
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node]
836

  
837
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
838
      nl.append(self.op.target_node)
839

  
840
    return (nl, nl)
841

  
842
  def CheckPrereq(self):
843
    """Check prerequisites.
844

  
845
    This checks that the instance and node names are valid.
846

  
847
    """
848
    instance_name = self.op.instance_name
849

  
850
    self.instance = self.cfg.GetInstanceInfo(instance_name)
851
    assert self.instance is not None, \
852
          "Cannot retrieve locked instance %s" % self.op.instance_name
853
    _CheckNodeOnline(self, self.instance.primary_node)
854

  
855
    if (self.op.remove_instance and
856
        self.instance.admin_state == constants.ADMINST_UP and
857
        not self.op.shutdown):
858
      raise errors.OpPrereqError("Can not remove instance without shutting it"
859
                                 " down before", errors.ECODE_STATE)
860

  
861
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
862
      self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
863
      self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
864
      assert self.dst_node is not None
865

  
866
      _CheckNodeOnline(self, self.dst_node.name)
867
      _CheckNodeNotDrained(self, self.dst_node.name)
868

  
869
      self._cds = None
870
      self.dest_disk_info = None
871
      self.dest_x509_ca = None
872

  
873
    elif self.op.mode == constants.EXPORT_MODE_REMOTE:
874
      self.dst_node = None
875

  
876
      if len(self.op.target_node) != len(self.instance.disks):
877
        raise errors.OpPrereqError(("Received destination information for %s"
878
                                    " disks, but instance %s has %s disks") %
879
                                   (len(self.op.target_node), instance_name,
880
                                    len(self.instance.disks)),
881
                                   errors.ECODE_INVAL)
882

  
883
      cds = _GetClusterDomainSecret()
884

  
885
      # Check X509 key name
886
      try:
887
        (key_name, hmac_digest, hmac_salt) = self.x509_key_name
888
      except (TypeError, ValueError), err:
889
        raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err,
890
                                   errors.ECODE_INVAL)
891

  
892
      if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
893
        raise errors.OpPrereqError("HMAC for X509 key name is wrong",
894
                                   errors.ECODE_INVAL)
895

  
896
      # Load and verify CA
897
      try:
898
        (cert, _) = utils.LoadSignedX509Certificate(self.dest_x509_ca_pem, cds)
899
      except OpenSSL.crypto.Error, err:
900
        raise errors.OpPrereqError("Unable to load destination X509 CA (%s)" %
901
                                   (err, ), errors.ECODE_INVAL)
902

  
903
      (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
904
      if errcode is not None:
905
        raise errors.OpPrereqError("Invalid destination X509 CA (%s)" %
906
                                   (msg, ), errors.ECODE_INVAL)
907

  
908
      self.dest_x509_ca = cert
909

  
910
      # Verify target information
911
      disk_info = []
912
      for idx, disk_data in enumerate(self.op.target_node):
913
        try:
914
          (host, port, magic) = \
915
            masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data)
916
        except errors.GenericError, err:
917
          raise errors.OpPrereqError("Target info for disk %s: %s" %
918
                                     (idx, err), errors.ECODE_INVAL)
919

  
920
        disk_info.append((host, port, magic))
921

  
922
      assert len(disk_info) == len(self.op.target_node)
923
      self.dest_disk_info = disk_info
924

  
925
    else:
926
      raise errors.ProgrammerError("Unhandled export mode %r" %
927
                                   self.op.mode)
928

  
929
    # instance disk type verification
930
    # TODO: Implement export support for file-based disks
931
    for disk in self.instance.disks:
932
      if disk.dev_type == constants.LD_FILE:
933
        raise errors.OpPrereqError("Export not supported for instances with"
934
                                   " file-based disks", errors.ECODE_INVAL)
935

  
936
  def _CleanupExports(self, feedback_fn):
937
    """Removes exports of current instance from all other nodes.
938

  
939
    If an instance in a cluster with nodes A..D was exported to node C, its
940
    exports will be removed from the nodes A, B and D.
941

  
942
    """
943
    assert self.op.mode != constants.EXPORT_MODE_REMOTE
944

  
945
    nodelist = self.cfg.GetNodeList()
946
    nodelist.remove(self.dst_node.name)
947

  
948
    # on one-node clusters nodelist will be empty after the removal
949
    # if we proceed the backup would be removed because OpBackupQuery
950
    # substitutes an empty list with the full cluster node list.
951
    iname = self.instance.name
952
    if nodelist:
953
      feedback_fn("Removing old exports for instance %s" % iname)
954
      exportlist = self.rpc.call_export_list(nodelist)
955
      for node in exportlist:
956
        if exportlist[node].fail_msg:
957
          continue
958
        if iname in exportlist[node].payload:
959
          msg = self.rpc.call_export_remove(node, iname).fail_msg
960
          if msg:
961
            self.LogWarning("Could not remove older export for instance %s"
962
                            " on node %s: %s", iname, node, msg)
963

  
964
  def Exec(self, feedback_fn):
965
    """Export an instance to an image in the cluster.
966

  
967
    """
968
    assert self.op.mode in constants.EXPORT_MODES
969

  
970
    instance = self.instance
971
    src_node = instance.primary_node
972

  
973
    if self.op.shutdown:
974
      # shutdown the instance, but not the disks
975
      feedback_fn("Shutting down instance %s" % instance.name)
976
      result = self.rpc.call_instance_shutdown(src_node, instance,
977
                                               self.op.shutdown_timeout,
978
                                               self.op.reason)
979
      # TODO: Maybe ignore failures if ignore_remove_failures is set
980
      result.Raise("Could not shutdown instance %s on"
981
                   " node %s" % (instance.name, src_node))
982

  
983
    # set the disks ID correctly since call_instance_start needs the
984
    # correct drbd minor to create the symlinks
985
    for disk in instance.disks:
986
      self.cfg.SetDiskID(disk, src_node)
987

  
988
    activate_disks = (instance.admin_state != constants.ADMINST_UP)
989

  
990
    if activate_disks:
991
      # Activate the instance disks if we'exporting a stopped instance
992
      feedback_fn("Activating disks for %s" % instance.name)
993
      _StartInstanceDisks(self, instance, None)
994

  
995
    try:
996
      helper = masterd.instance.ExportInstanceHelper(self, feedback_fn,
997
                                                     instance)
998

  
999
      helper.CreateSnapshots()
1000
      try:
1001
        if (self.op.shutdown and
1002
            instance.admin_state == constants.ADMINST_UP and
1003
            not self.op.remove_instance):
1004
          assert not activate_disks
1005
          feedback_fn("Starting instance %s" % instance.name)
1006
          result = self.rpc.call_instance_start(src_node,
1007
                                                (instance, None, None), False,
1008
                                                 self.op.reason)
1009
          msg = result.fail_msg
1010
          if msg:
1011
            feedback_fn("Failed to start instance: %s" % msg)
1012
            _ShutdownInstanceDisks(self, instance)
1013
            raise errors.OpExecError("Could not start instance: %s" % msg)
1014

  
1015
        if self.op.mode == constants.EXPORT_MODE_LOCAL:
1016
          (fin_resu, dresults) = helper.LocalExport(self.dst_node)
1017
        elif self.op.mode == constants.EXPORT_MODE_REMOTE:
1018
          connect_timeout = constants.RIE_CONNECT_TIMEOUT
1019
          timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
1020

  
1021
          (key_name, _, _) = self.x509_key_name
1022

  
1023
          dest_ca_pem = \
1024
            OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1025
                                            self.dest_x509_ca)
1026

  
1027
          (fin_resu, dresults) = helper.RemoteExport(self.dest_disk_info,
1028
                                                     key_name, dest_ca_pem,
1029
                                                     timeouts)
1030
      finally:
1031
        helper.Cleanup()
1032

  
1033
      # Check for backwards compatibility
1034
      assert len(dresults) == len(instance.disks)
1035
      assert compat.all(isinstance(i, bool) for i in dresults), \
1036
             "Not all results are boolean: %r" % dresults
1037

  
1038
    finally:
1039
      if activate_disks:
1040
        feedback_fn("Deactivating disks for %s" % instance.name)
1041
        _ShutdownInstanceDisks(self, instance)
1042

  
1043
    if not (compat.all(dresults) and fin_resu):
1044
      failures = []
1045
      if not fin_resu:
1046
        failures.append("export finalization")
1047
      if not compat.all(dresults):
1048
        fdsk = utils.CommaJoin(idx for (idx, dsk) in enumerate(dresults)
1049
                               if not dsk)
1050
        failures.append("disk export: disk(s) %s" % fdsk)
1051

  
1052
      raise errors.OpExecError("Export failed, errors in %s" %
1053
                               utils.CommaJoin(failures))
1054

  
1055
    # At this point, the export was successful, we can cleanup/finish
1056

  
1057
    # Remove instance if requested
1058
    if self.op.remove_instance:
1059
      feedback_fn("Removing instance %s" % instance.name)
1060
      _RemoveInstance(self, feedback_fn, instance,
1061
                      self.op.ignore_remove_failures)
1062

  
1063
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
1064
      self._CleanupExports(feedback_fn)
1065

  
1066
    return fin_resu, dresults
1067

  
1068

  
1069
class LUBackupRemove(NoHooksLU):
1070
  """Remove exports related to the named instance.
1071

  
1072
  """
1073
  REQ_BGL = False
1074

  
1075
  def ExpandNames(self):
1076
    self.needed_locks = {
1077
      # We need all nodes to be locked in order for RemoveExport to work, but
1078
      # we don't need to lock the instance itself, as nothing will happen to it
1079
      # (and we can remove exports also for a removed instance)
1080
      locking.LEVEL_NODE: locking.ALL_SET,
1081

  
1082
      # Removing backups is quick, so blocking allocations is justified
1083
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1084
      }
1085

  
1086
    # Allocations should be stopped while this LU runs with node locks, but it
1087
    # doesn't have to be exclusive
1088
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
1089

  
1090
  def Exec(self, feedback_fn):
1091
    """Remove any export.
1092

  
1093
    """
1094
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
1095
    # If the instance was not found we'll try with the name that was passed in.
1096
    # This will only work if it was an FQDN, though.
1097
    fqdn_warn = False
1098
    if not instance_name:
1099
      fqdn_warn = True
1100
      instance_name = self.op.instance_name
1101

  
1102
    locked_nodes = self.owned_locks(locking.LEVEL_NODE)
1103
    exportlist = self.rpc.call_export_list(locked_nodes)
1104
    found = False
1105
    for node in exportlist:
1106
      msg = exportlist[node].fail_msg
1107
      if msg:
1108
        self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
1109
        continue
1110
      if instance_name in exportlist[node].payload:
1111
        found = True
1112
        result = self.rpc.call_export_remove(node, instance_name)
1113
        msg = result.fail_msg
1114
        if msg:
1115
          logging.error("Could not remove export for instance %s"
1116
                        " on node %s: %s", instance_name, node, msg)
1117

  
1118
    if fqdn_warn and not found:
1119
      feedback_fn("Export not found. If trying to remove an export belonging"
1120
                  " to a deleted instance please use its Fully Qualified"
1121
                  " Domain Name.")
1122

  
1123

  
1124 632
class LURestrictedCommand(NoHooksLU):
1125 633
  """Logical unit for executing restricted commands.
1126 634

  
b/lib/cmdlib/backup.py
1
#
2
#
3

  
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21

  
22
"""Logical units dealing with backup operations."""
23

  
24
import OpenSSL
25
import logging
26

  
27
from ganeti import compat
28
from ganeti import constants
29
from ganeti import errors
30
from ganeti import locking
31
from ganeti import masterd
32
from ganeti import qlang
33
from ganeti import query
34
from ganeti import utils
35

  
36
from ganeti.cmdlib.base import _QueryBase, NoHooksLU, LogicalUnit
37
from ganeti.cmdlib.common import _GetWantedNodes, _ShareAll, \
38
  _CheckNodeOnline, _ExpandNodeName
39
from ganeti.cmdlib.instance_utils import _GetClusterDomainSecret, \
40
  _BuildInstanceHookEnvByObject, _CheckNodeNotDrained, _StartInstanceDisks, \
41
  _ShutdownInstanceDisks, _RemoveInstance
42

  
43

  
44
class _ExportQuery(_QueryBase):
45
  FIELDS = query.EXPORT_FIELDS
46

  
47
  #: The node name is not a unique key for this query
48
  SORT_FIELD = "node"
49

  
50
  def ExpandNames(self, lu):
51
    lu.needed_locks = {}
52

  
53
    # The following variables interact with _QueryBase._GetNames
54
    if self.names:
55
      self.wanted = _GetWantedNodes(lu, self.names)
56
    else:
57
      self.wanted = locking.ALL_SET
58

  
59
    self.do_locking = self.use_locking
60

  
61
    if self.do_locking:
62
      lu.share_locks = _ShareAll()
63
      lu.needed_locks = {
64
        locking.LEVEL_NODE: self.wanted,
65
        }
66

  
67
      if not self.names:
68
        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
69

  
70
  def DeclareLocks(self, lu, level):
71
    pass
72

  
73
  def _GetQueryData(self, lu):
74
    """Computes the list of nodes and their attributes.
75

  
76
    """
77
    # Locking is not used
78
    # TODO
79
    assert not (compat.any(lu.glm.is_owned(level)
80
                           for level in locking.LEVELS
81
                           if level != locking.LEVEL_CLUSTER) or
82
                self.do_locking or self.use_locking)
83

  
84
    nodes = self._GetNames(lu, lu.cfg.GetNodeList(), locking.LEVEL_NODE)
85

  
86
    result = []
87

  
88
    for (node, nres) in lu.rpc.call_export_list(nodes).items():
89
      if nres.fail_msg:
90
        result.append((node, None))
91
      else:
92
        result.extend((node, expname) for expname in nres.payload)
93

  
94
    return result
95

  
96

  
97
class LUBackupQuery(NoHooksLU):
98
  """Query the exports list
99

  
100
  """
101
  REQ_BGL = False
102

  
103
  def CheckArguments(self):
104
    self.expq = _ExportQuery(qlang.MakeSimpleFilter("node", self.op.nodes),
105
                             ["node", "export"], self.op.use_locking)
106

  
107
  def ExpandNames(self):
108
    self.expq.ExpandNames(self)
109

  
110
  def DeclareLocks(self, level):
111
    self.expq.DeclareLocks(self, level)
112

  
113
  def Exec(self, feedback_fn):
114
    result = {}
115

  
116
    for (node, expname) in self.expq.OldStyleQuery(self):
117
      if expname is None:
118
        result[node] = False
119
      else:
120
        result.setdefault(node, []).append(expname)
121

  
122
    return result
123

  
124

  
125
class LUBackupPrepare(NoHooksLU):
126
  """Prepares an instance for an export and returns useful information.
127

  
128
  """
129
  REQ_BGL = False
130

  
131
  def ExpandNames(self):
132
    self._ExpandAndLockInstance()
133

  
134
  def CheckPrereq(self):
135
    """Check prerequisites.
136

  
137
    """
138
    instance_name = self.op.instance_name
139

  
140
    self.instance = self.cfg.GetInstanceInfo(instance_name)
141
    assert self.instance is not None, \
142
          "Cannot retrieve locked instance %s" % self.op.instance_name
143
    _CheckNodeOnline(self, self.instance.primary_node)
144

  
145
    self._cds = _GetClusterDomainSecret()
146

  
147
  def Exec(self, feedback_fn):
148
    """Prepares an instance for an export.
149

  
150
    """
151
    instance = self.instance
152

  
153
    if self.op.mode == constants.EXPORT_MODE_REMOTE:
154
      salt = utils.GenerateSecret(8)
155

  
156
      feedback_fn("Generating X509 certificate on %s" % instance.primary_node)
157
      result = self.rpc.call_x509_cert_create(instance.primary_node,
158
                                              constants.RIE_CERT_VALIDITY)
159
      result.Raise("Can't create X509 key and certificate on %s" % result.node)
160

  
161
      (name, cert_pem) = result.payload
162

  
163
      cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
164
                                             cert_pem)
165

  
166
      return {
167
        "handshake": masterd.instance.ComputeRemoteExportHandshake(self._cds),
168
        "x509_key_name": (name, utils.Sha1Hmac(self._cds, name, salt=salt),
169
                          salt),
170
        "x509_ca": utils.SignX509Certificate(cert, self._cds, salt),
171
        }
172

  
173
    return None
174

  
175

  
176
class LUBackupExport(LogicalUnit):
177
  """Export an instance to an image in the cluster.
178

  
179
  """
180
  HPATH = "instance-export"
181
  HTYPE = constants.HTYPE_INSTANCE
182
  REQ_BGL = False
183

  
184
  def CheckArguments(self):
185
    """Check the arguments.
186

  
187
    """
188
    self.x509_key_name = self.op.x509_key_name
189
    self.dest_x509_ca_pem = self.op.destination_x509_ca
190

  
191
    if self.op.mode == constants.EXPORT_MODE_REMOTE:
192
      if not self.x509_key_name:
193
        raise errors.OpPrereqError("Missing X509 key name for encryption",
194
                                   errors.ECODE_INVAL)
195

  
196
      if not self.dest_x509_ca_pem:
197
        raise errors.OpPrereqError("Missing destination X509 CA",
198
                                   errors.ECODE_INVAL)
199

  
200
  def ExpandNames(self):
201
    self._ExpandAndLockInstance()
202

  
203
    # Lock all nodes for local exports
204
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
205
      # FIXME: lock only instance primary and destination node
206
      #
207
      # Sad but true, for now we have do lock all nodes, as we don't know where
208
      # the previous export might be, and in this LU we search for it and
209
      # remove it from its current node. In the future we could fix this by:
210
      #  - making a tasklet to search (share-lock all), then create the
211
      #    new one, then one to remove, after
212
      #  - removing the removal operation altogether
213
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
214

  
215
      # Allocations should be stopped while this LU runs with node locks, but
216
      # it doesn't have to be exclusive
217
      self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
218
      self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
219

  
220
  def DeclareLocks(self, level):
221
    """Last minute lock declaration."""
222
    # All nodes are locked anyway, so nothing to do here.
223

  
224
  def BuildHooksEnv(self):
225
    """Build hooks env.
226

  
227
    This will run on the master, primary node and target node.
228

  
229
    """
230
    env = {
231
      "EXPORT_MODE": self.op.mode,
232
      "EXPORT_NODE": self.op.target_node,
233
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
234
      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
235
      # TODO: Generic function for boolean env variables
236
      "REMOVE_INSTANCE": str(bool(self.op.remove_instance)),
237
      }
238

  
239
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
240

  
241
    return env
242

  
243
  def BuildHooksNodes(self):
244
    """Build hooks nodes.
245

  
246
    """
247
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node]
248

  
249
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
250
      nl.append(self.op.target_node)
251

  
252
    return (nl, nl)
253

  
254
  def CheckPrereq(self):
255
    """Check prerequisites.
256

  
257
    This checks that the instance and node names are valid.
258

  
259
    """
260
    instance_name = self.op.instance_name
261

  
262
    self.instance = self.cfg.GetInstanceInfo(instance_name)
263
    assert self.instance is not None, \
264
          "Cannot retrieve locked instance %s" % self.op.instance_name
265
    _CheckNodeOnline(self, self.instance.primary_node)
266

  
267
    if (self.op.remove_instance and
268
        self.instance.admin_state == constants.ADMINST_UP and
269
        not self.op.shutdown):
270
      raise errors.OpPrereqError("Can not remove instance without shutting it"
271
                                 " down before", errors.ECODE_STATE)
272

  
273
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
274
      self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
275
      self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
276
      assert self.dst_node is not None
277

  
278
      _CheckNodeOnline(self, self.dst_node.name)
279
      _CheckNodeNotDrained(self, self.dst_node.name)
280

  
281
      self._cds = None
282
      self.dest_disk_info = None
283
      self.dest_x509_ca = None
284

  
285
    elif self.op.mode == constants.EXPORT_MODE_REMOTE:
286
      self.dst_node = None
287

  
288
      if len(self.op.target_node) != len(self.instance.disks):
289
        raise errors.OpPrereqError(("Received destination information for %s"
290
                                    " disks, but instance %s has %s disks") %
291
                                   (len(self.op.target_node), instance_name,
292
                                    len(self.instance.disks)),
293
                                   errors.ECODE_INVAL)
294

  
295
      cds = _GetClusterDomainSecret()
296

  
297
      # Check X509 key name
298
      try:
299
        (key_name, hmac_digest, hmac_salt) = self.x509_key_name
300
      except (TypeError, ValueError), err:
301
        raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err,
302
                                   errors.ECODE_INVAL)
303

  
304
      if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
305
        raise errors.OpPrereqError("HMAC for X509 key name is wrong",
306
                                   errors.ECODE_INVAL)
307

  
308
      # Load and verify CA
309
      try:
310
        (cert, _) = utils.LoadSignedX509Certificate(self.dest_x509_ca_pem, cds)
311
      except OpenSSL.crypto.Error, err:
312
        raise errors.OpPrereqError("Unable to load destination X509 CA (%s)" %
313
                                   (err, ), errors.ECODE_INVAL)
314

  
315
      (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
316
      if errcode is not None:
317
        raise errors.OpPrereqError("Invalid destination X509 CA (%s)" %
318
                                   (msg, ), errors.ECODE_INVAL)
319

  
320
      self.dest_x509_ca = cert
321

  
322
      # Verify target information
323
      disk_info = []
324
      for idx, disk_data in enumerate(self.op.target_node):
325
        try:
326
          (host, port, magic) = \
327
            masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data)
328
        except errors.GenericError, err:
329
          raise errors.OpPrereqError("Target info for disk %s: %s" %
330
                                     (idx, err), errors.ECODE_INVAL)
331

  
332
        disk_info.append((host, port, magic))
333

  
334
      assert len(disk_info) == len(self.op.target_node)
335
      self.dest_disk_info = disk_info
336

  
337
    else:
338
      raise errors.ProgrammerError("Unhandled export mode %r" %
339
                                   self.op.mode)
340

  
341
    # instance disk type verification
342
    # TODO: Implement export support for file-based disks
343
    for disk in self.instance.disks:
344
      if disk.dev_type == constants.LD_FILE:
345
        raise errors.OpPrereqError("Export not supported for instances with"
346
                                   " file-based disks", errors.ECODE_INVAL)
347

  
348
  def _CleanupExports(self, feedback_fn):
349
    """Removes exports of current instance from all other nodes.
350

  
351
    If an instance in a cluster with nodes A..D was exported to node C, its
352
    exports will be removed from the nodes A, B and D.
353

  
354
    """
355
    assert self.op.mode != constants.EXPORT_MODE_REMOTE
356

  
357
    nodelist = self.cfg.GetNodeList()
358
    nodelist.remove(self.dst_node.name)
359

  
360
    # on one-node clusters nodelist will be empty after the removal
361
    # if we proceed the backup would be removed because OpBackupQuery
362
    # substitutes an empty list with the full cluster node list.
363
    iname = self.instance.name
364
    if nodelist:
365
      feedback_fn("Removing old exports for instance %s" % iname)
366
      exportlist = self.rpc.call_export_list(nodelist)
367
      for node in exportlist:
368
        if exportlist[node].fail_msg:
369
          continue
370
        if iname in exportlist[node].payload:
371
          msg = self.rpc.call_export_remove(node, iname).fail_msg
372
          if msg:
373
            self.LogWarning("Could not remove older export for instance %s"
374
                            " on node %s: %s", iname, node, msg)
375

  
376
  def Exec(self, feedback_fn):
377
    """Export an instance to an image in the cluster.
378

  
379
    """
380
    assert self.op.mode in constants.EXPORT_MODES
381

  
382
    instance = self.instance
383
    src_node = instance.primary_node
384

  
385
    if self.op.shutdown:
386
      # shutdown the instance, but not the disks
387
      feedback_fn("Shutting down instance %s" % instance.name)
388
      result = self.rpc.call_instance_shutdown(src_node, instance,
389
                                               self.op.shutdown_timeout,
390
                                               self.op.reason)
391
      # TODO: Maybe ignore failures if ignore_remove_failures is set
392
      result.Raise("Could not shutdown instance %s on"
393
                   " node %s" % (instance.name, src_node))
394

  
395
    # set the disks ID correctly since call_instance_start needs the
396
    # correct drbd minor to create the symlinks
397
    for disk in instance.disks:
398
      self.cfg.SetDiskID(disk, src_node)
399

  
400
    activate_disks = (instance.admin_state != constants.ADMINST_UP)
401

  
402
    if activate_disks:
403
      # Activate the instance disks if we'exporting a stopped instance
404
      feedback_fn("Activating disks for %s" % instance.name)
405
      _StartInstanceDisks(self, instance, None)
406

  
407
    try:
408
      helper = masterd.instance.ExportInstanceHelper(self, feedback_fn,
409
                                                     instance)
410

  
411
      helper.CreateSnapshots()
412
      try:
413
        if (self.op.shutdown and
414
            instance.admin_state == constants.ADMINST_UP and
415
            not self.op.remove_instance):
416
          assert not activate_disks
417
          feedback_fn("Starting instance %s" % instance.name)
418
          result = self.rpc.call_instance_start(src_node,
419
                                                (instance, None, None), False,
420
                                                 self.op.reason)
421
          msg = result.fail_msg
422
          if msg:
423
            feedback_fn("Failed to start instance: %s" % msg)
424
            _ShutdownInstanceDisks(self, instance)
425
            raise errors.OpExecError("Could not start instance: %s" % msg)
426

  
427
        if self.op.mode == constants.EXPORT_MODE_LOCAL:
428
          (fin_resu, dresults) = helper.LocalExport(self.dst_node)
429
        elif self.op.mode == constants.EXPORT_MODE_REMOTE:
430
          connect_timeout = constants.RIE_CONNECT_TIMEOUT
431
          timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
432

  
433
          (key_name, _, _) = self.x509_key_name
434

  
435
          dest_ca_pem = \
436
            OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
437
                                            self.dest_x509_ca)
438

  
439
          (fin_resu, dresults) = helper.RemoteExport(self.dest_disk_info,
440
                                                     key_name, dest_ca_pem,
441
                                                     timeouts)
442
      finally:
443
        helper.Cleanup()
444

  
445
      # Check for backwards compatibility
446
      assert len(dresults) == len(instance.disks)
447
      assert compat.all(isinstance(i, bool) for i in dresults), \
448
             "Not all results are boolean: %r" % dresults
449

  
450
    finally:
451
      if activate_disks:
452
        feedback_fn("Deactivating disks for %s" % instance.name)
453
        _ShutdownInstanceDisks(self, instance)
454

  
455
    if not (compat.all(dresults) and fin_resu):
456
      failures = []
457
      if not fin_resu:
458
        failures.append("export finalization")
459
      if not compat.all(dresults):
460
        fdsk = utils.CommaJoin(idx for (idx, dsk) in enumerate(dresults)
461
                               if not dsk)
462
        failures.append("disk export: disk(s) %s" % fdsk)
463

  
464
      raise errors.OpExecError("Export failed, errors in %s" %
465
                               utils.CommaJoin(failures))
466

  
467
    # At this point, the export was successful, we can cleanup/finish
468

  
469
    # Remove instance if requested
470
    if self.op.remove_instance:
471
      feedback_fn("Removing instance %s" % instance.name)
472
      _RemoveInstance(self, feedback_fn, instance,
473
                      self.op.ignore_remove_failures)
474

  
475
    if self.op.mode == constants.EXPORT_MODE_LOCAL:
476
      self._CleanupExports(feedback_fn)
477

  
478
    return fin_resu, dresults
479

  
480

  
481
class LUBackupRemove(NoHooksLU):
482
  """Remove exports related to the named instance.
483

  
484
  """
485
  REQ_BGL = False
486

  
487
  def ExpandNames(self):
488
    self.needed_locks = {
489
      # We need all nodes to be locked in order for RemoveExport to work, but
490
      # we don't need to lock the instance itself, as nothing will happen to it
491
      # (and we can remove exports also for a removed instance)
492
      locking.LEVEL_NODE: locking.ALL_SET,
493

  
494
      # Removing backups is quick, so blocking allocations is justified
495
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
496
      }
497

  
498
    # Allocations should be stopped while this LU runs with node locks, but it
499
    # doesn't have to be exclusive
500
    self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
501

  
502
  def Exec(self, feedback_fn):
503
    """Remove any export.
504

  
505
    """
506
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
507
    # If the instance was not found we'll try with the name that was passed in.
508
    # This will only work if it was an FQDN, though.
509
    fqdn_warn = False
510
    if not instance_name:
511
      fqdn_warn = True
512
      instance_name = self.op.instance_name
513

  
514
    locked_nodes = self.owned_locks(locking.LEVEL_NODE)
515
    exportlist = self.rpc.call_export_list(locked_nodes)
516
    found = False
517
    for node in exportlist:
518
      msg = exportlist[node].fail_msg
519
      if msg:
520
        self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
521
        continue
522
      if instance_name in exportlist[node].payload:
523
        found = True
524
        result = self.rpc.call_export_remove(node, instance_name)
525
        msg = result.fail_msg
526
        if msg:
527
          logging.error("Could not remove export for instance %s"
528
                        " on node %s: %s", instance_name, node, msg)
529

  
530
    if fqdn_warn and not found:
531
      feedback_fn("Export not found. If trying to remove an export belonging"
532
                  " to a deleted instance please use its Fully Qualified"
533
                  " Domain Name.")

Also available in: Unified diff