Add GetInstanceNetworks() config method
[ganeti-local] / lib / masterd / instance.py
index d83e836..d99f4d8 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2010 Google Inc.
+# Copyright (C) 2010, 2011 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -33,6 +33,7 @@ from ganeti import compat
 from ganeti import utils
 from ganeti import objects
 from ganeti import netutils
+from ganeti import pathutils
 
 
 class _ImportExportError(Exception):
@@ -92,12 +93,13 @@ class ImportExportCbBase(object):
   """Callbacks for disk import/export.
 
   """
-  def ReportListening(self, ie, private):
+  def ReportListening(self, ie, private, component):
     """Called when daemon started listening.
 
     @type ie: Subclass of L{_DiskImportExportBase}
     @param ie: Import/export object
     @param private: Private data passed to import/export object
+    @param component: transfer component name
 
     """
 
@@ -129,18 +131,11 @@ class ImportExportCbBase(object):
     """
 
 
-def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
-  """Checks whether a timeout has expired.
-
-  """
-  return _time_fn() > (epoch + timeout)
-
-
 class _DiskImportExportBase(object):
   MODE_TEXT = None
 
   def __init__(self, lu, node_name, opts,
-               instance, timeouts, cbs, private=None):
+               instance, component, timeouts, cbs, private=None):
     """Initializes this class.
 
     @param lu: Logical unit instance
@@ -150,6 +145,8 @@ class _DiskImportExportBase(object):
     @param opts: Import/export daemon options
     @type instance: L{objects.Instance}
     @param instance: Instance object
+    @type component: string
+    @param component: which part of the instance is being imported
     @type timeouts: L{ImportExportTimeouts}
     @param timeouts: Timeouts for this import
     @type cbs: L{ImportExportCbBase}
@@ -161,12 +158,17 @@ class _DiskImportExportBase(object):
 
     self._lu = lu
     self.node_name = node_name
-    self._opts = opts
+    self._opts = opts.Copy()
     self._instance = instance
+    self._component = component
     self._timeouts = timeouts
     self._cbs = cbs
     self._private = private
 
+    # Set master daemon's timeout in options for import/export daemon
+    assert self._opts.connect_timeout is None
+    self._opts.connect_timeout = timeouts.connect
+
     # Parent loop
     self._loop = None
 
@@ -192,7 +194,7 @@ class _DiskImportExportBase(object):
 
     """
     if self._daemon:
-      return self._daemon.recent_output
+      return "\n".join(self._daemon.recent_output)
 
     return None
 
@@ -269,7 +271,7 @@ class _DiskImportExportBase(object):
 
       daemon_name = result.payload
 
-      logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
+      logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
                    self.node_name)
 
       self._ts_begin = time.time()
@@ -290,11 +292,11 @@ class _DiskImportExportBase(object):
 
     """
     if self._daemon_name:
-      self._lu.LogWarning("Aborting %s %r on %s",
+      self._lu.LogWarning("Aborting %s '%s' on %s",
                           self.MODE_TEXT, self._daemon_name, self.node_name)
       result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
       if result.fail_msg:
-        self._lu.LogWarning("Failed to abort %s %r on %s: %s",
+        self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
                             self.MODE_TEXT, self._daemon_name,
                             self.node_name, result.fail_msg)
         return False
@@ -311,7 +313,7 @@ class _DiskImportExportBase(object):
     assert self._ts_begin is not None
 
     if not data:
-      if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
+      if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready):
         raise _ImportExportError("Didn't become ready after %s seconds" %
                                  self._timeouts.ready)
 
@@ -334,7 +336,7 @@ class _DiskImportExportBase(object):
       if self._ts_last_error is None:
         self._ts_last_error = time.time()
 
-      elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
+      elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error):
         raise _ImportExportError("Too many errors while updating data")
 
       return False
@@ -371,14 +373,15 @@ class _DiskImportExportBase(object):
       self._ts_connected = time.time()
 
       # TODO: Log remote peer
-      logging.debug("%s %r on %s is now connected",
+      logging.debug("%s '%s' on %s is now connected",
                     self.MODE_TEXT, self._daemon_name, self.node_name)
 
       self._cbs.ReportConnected(self, self._private)
 
       return True
 
-    if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
+    if utils.TimeoutExpired(self._GetConnectedCheckEpoch(),
+                            self._timeouts.connect):
       raise _ImportExportError("Not connected after %s seconds" %
                                self._timeouts.connect)
 
@@ -389,7 +392,8 @@ class _DiskImportExportBase(object):
 
     """
     if ((self._ts_last_progress is None or
-         _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
+        utils.TimeoutExpired(self._ts_last_progress,
+                             self._timeouts.progress)) and
         self._daemon and
         self._daemon.progress_mbytes is not None and
         self._daemon.progress_throughput is not None):
@@ -435,10 +439,10 @@ class _DiskImportExportBase(object):
     self.final_message = message
 
     if success:
-      logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
-                   self.node_name)
+      logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
+                   self._daemon_name, self.node_name)
     elif self._daemon_name:
-      self._lu.LogWarning("%s %r on %s failed: %s",
+      self._lu.LogWarning("%s '%s' on %s failed: %s",
                           self.MODE_TEXT, self._daemon_name, self.node_name,
                           message)
     else:
@@ -458,12 +462,12 @@ class _DiskImportExportBase(object):
 
     """
     if self._daemon_name:
-      logging.info("Finalizing %s %r on %s",
+      logging.info("Finalizing %s '%s' on %s",
                    self.MODE_TEXT, self._daemon_name, self.node_name)
 
       result = self._Finalize()
       if result.fail_msg:
-        self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
+        self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
                             self.MODE_TEXT, self._daemon_name,
                             self.node_name, result.fail_msg)
         return False
@@ -481,7 +485,7 @@ class _DiskImportExportBase(object):
 class DiskImport(_DiskImportExportBase):
   MODE_TEXT = "import"
 
-  def __init__(self, lu, node_name, opts, instance,
+  def __init__(self, lu, node_name, opts, instance, component,
                dest, dest_args, timeouts, cbs, private=None):
     """Initializes this class.
 
@@ -492,6 +496,8 @@ class DiskImport(_DiskImportExportBase):
     @param opts: Import/export daemon options
     @type instance: L{objects.Instance}
     @param instance: Instance object
+    @type component: string
+    @param component: which part of the instance is being imported
     @param dest: I/O destination
     @param dest_args: I/O arguments
     @type timeouts: L{ImportExportTimeouts}
@@ -501,8 +507,8 @@ class DiskImport(_DiskImportExportBase):
     @param private: Private data for callback functions
 
     """
-    _DiskImportExportBase.__init__(self, lu, node_name, opts,
-                                   instance, timeouts, cbs, private)
+    _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
+                                   component, timeouts, cbs, private)
     self._dest = dest
     self._dest_args = dest_args
 
@@ -524,8 +530,8 @@ class DiskImport(_DiskImportExportBase):
 
     """
     return self._lu.rpc.call_import_start(self.node_name, self._opts,
-                                          self._instance,
-                                          self._dest, self._dest_args)
+                                          self._instance, self._component,
+                                          (self._dest, self._dest_args))
 
   def CheckListening(self):
     """Checks whether the daemon is listening.
@@ -543,14 +549,14 @@ class DiskImport(_DiskImportExportBase):
     if port is not None:
       self._ts_listening = time.time()
 
-      logging.debug("Import %r on %s is now listening on port %s",
+      logging.debug("Import '%s' on %s is now listening on port %s",
                     self._daemon_name, self.node_name, port)
 
-      self._cbs.ReportListening(self, self._private)
+      self._cbs.ReportListening(self, self._private, self._component)
 
       return True
 
-    if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
+    if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen):
       raise _ImportExportError("Not listening after %s seconds" %
                                self._timeouts.listen)
 
@@ -570,8 +576,8 @@ class DiskImport(_DiskImportExportBase):
 class DiskExport(_DiskImportExportBase):
   MODE_TEXT = "export"
 
-  def __init__(self, lu, node_name, opts,
-               dest_host, dest_port, instance, source, source_args,
+  def __init__(self, lu, node_name, opts, dest_host, dest_port,
+               instance, component, source, source_args,
                timeouts, cbs, private=None):
     """Initializes this class.
 
@@ -586,6 +592,8 @@ class DiskExport(_DiskImportExportBase):
     @param dest_port: Destination port number
     @type instance: L{objects.Instance}
     @param instance: Instance object
+    @type component: string
+    @param component: which part of the instance is being imported
     @param source: I/O source
     @param source_args: I/O source
     @type timeouts: L{ImportExportTimeouts}
@@ -595,8 +603,8 @@ class DiskExport(_DiskImportExportBase):
     @param private: Private data for callback functions
 
     """
-    _DiskImportExportBase.__init__(self, lu, node_name, opts,
-                                   instance, timeouts, cbs, private)
+    _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
+                                   component, timeouts, cbs, private)
     self._dest_host = dest_host
     self._dest_port = dest_port
     self._source = source
@@ -608,8 +616,8 @@ class DiskExport(_DiskImportExportBase):
     """
     return self._lu.rpc.call_export_start(self.node_name, self._opts,
                                           self._dest_host, self._dest_port,
-                                          self._instance, self._source,
-                                          self._source_args)
+                                          self._instance, self._component,
+                                          (self._source, self._source_args))
 
   def CheckListening(self):
     """Checks whether the daemon is listening.
@@ -865,7 +873,7 @@ class _TransferInstSourceCb(_TransferInstCbBase):
     if ie.success:
       self.feedback_fn("%s finished sending data" % dtp.data.name)
     else:
-      self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
+      self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
                        (dtp.data.name, ie.final_message, ie.recent_output))
 
     dtp.RecordResult(ie.success)
@@ -881,7 +889,7 @@ class _TransferInstSourceCb(_TransferInstCbBase):
 
 
 class _TransferInstDestCb(_TransferInstCbBase):
-  def ReportListening(self, ie, dtp):
+  def ReportListening(self, ie, dtp, component):
     """Called when daemon started listening.
 
     """
@@ -894,8 +902,8 @@ class _TransferInstDestCb(_TransferInstCbBase):
 
     # Start export on source node
     de = DiskExport(self.lu, self.src_node, dtp.export_opts,
-                    self.dest_ip, ie.listen_port,
-                    self.instance, dtp.data.src_io, dtp.data.src_ioargs,
+                    self.dest_ip, ie.listen_port, self.instance,
+                    component, dtp.data.src_io, dtp.data.src_ioargs,
                     self.timeouts, self.src_cbs, private=dtp)
     ie.loop.Add(de)
 
@@ -915,7 +923,7 @@ class _TransferInstDestCb(_TransferInstCbBase):
     if ie.success:
       self.feedback_fn("%s finished receiving data" % dtp.data.name)
     else:
-      self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
+      self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
                        (dtp.data.name, ie.final_message, ie.recent_output))
 
     dtp.RecordResult(ie.success)
@@ -1045,7 +1053,7 @@ def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
 
         dtp = _DiskTransferPrivate(transfer, True, opts)
 
-        di = DiskImport(lu, dest_node, opts, instance,
+        di = DiskImport(lu, dest_node, opts, instance, "disk%d" % idx,
                         transfer.dest_io, transfer.dest_ioargs,
                         timeouts, dest_cbs, private=dtp)
         ieloop.Add(di)
@@ -1116,7 +1124,7 @@ class _RemoteExportCb(ImportExportCbBase):
     if ie.success:
       self._feedback_fn("Disk %s finished sending data" % idx)
     else:
-      self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" %
+      self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
                         (idx, ie.final_message, ie.recent_output))
 
     self._dresults[idx] = bool(ie.success)
@@ -1157,7 +1165,7 @@ class ExportInstanceHelper:
 
       # result.payload will be a snapshot of an lvm leaf of the one we
       # passed
-      result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
+      result = self._lu.rpc.call_blockdev_snapshot(src_node, (disk, instance))
       new_dev = False
       msg = result.fail_msg
       if msg:
@@ -1169,9 +1177,11 @@ class ExportInstanceHelper:
                             " result '%s'", idx, src_node, result.payload)
       else:
         disk_id = tuple(result.payload)
+        disk_params = constants.DISK_LD_DEFAULTS[constants.LD_LV].copy()
         new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
                                logical_id=disk_id, physical_id=disk_id,
-                               iv_name=disk.iv_name)
+                               iv_name=disk.iv_name,
+                               params=disk_params)
 
       self._snap_disks.append(new_dev)
 
@@ -1218,7 +1228,7 @@ class ExportInstanceHelper:
         transfers.append(None)
         continue
 
-      path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
+      path = utils.PathJoin(pathutils.EXPORT_DIR, "%s.new" % instance.name,
                             dev.physical_id[1])
 
       finished_fn = compat.partial(self._TransferFinished, idx)
@@ -1282,7 +1292,7 @@ class ExportInstanceHelper:
         self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
         finished_fn = compat.partial(self._TransferFinished, idx)
         ieloop.Add(DiskExport(self._lu, instance.primary_node,
-                              opts, host, port, instance,
+                              opts, host, port, instance, "disk%d" % idx,
                               constants.IEIO_SCRIPT, (dev, idx),
                               timeouts, cbs, private=(idx, finished_fn)))
 
@@ -1368,7 +1378,7 @@ class _RemoteImportCb(ImportExportCbBase):
       "x509_ca": self._x509_cert_pem,
       })
 
-  def ReportListening(self, ie, private):
+  def ReportListening(self, ie, private, _):
     """Called when daemon started listening.
 
     """
@@ -1403,7 +1413,7 @@ class _RemoteImportCb(ImportExportCbBase):
       self._feedback_fn("Disk %s finished receiving data" % idx)
     else:
       self._feedback_fn(("Disk %s failed to receive data: %s"
-                         " (recent output: %r)") %
+                         " (recent output: %s)") %
                         (idx, ie.final_message, ie.recent_output))
 
     self._dresults[idx] = bool(ie.success)
@@ -1464,6 +1474,7 @@ def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
                                            magic=magic, ipv6=ipv6)
 
         ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
+                              "disk%d" % idx,
                               constants.IEIO_SCRIPT, (dev, idx),
                               timeouts, cbs, private=(idx, )))
 
@@ -1595,3 +1606,35 @@ def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
   msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
   hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
   return (host, port, magic, hmac_digest, salt)
+
+
+def CalculateGroupIPolicy(cluster, group):
+  """Calculate instance policy for group.
+
+  """
+  return cluster.SimpleFillIPolicy(group.ipolicy)
+
+
+def ComputeDiskSize(disk_template, disks):
+  """Compute disk size requirements according to disk template
+
+  """
+  # Required free disk space as a function of disk and swap space
+  req_size_dict = {
+    constants.DT_DISKLESS: None,
+    constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
+    # 128 MB are added for drbd metadata for each disk
+    constants.DT_DRBD8:
+      sum(d[constants.IDISK_SIZE] + constants.DRBD_META_SIZE for d in disks),
+    constants.DT_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
+    constants.DT_SHARED_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
+    constants.DT_BLOCK: 0,
+    constants.DT_RBD: sum(d[constants.IDISK_SIZE] for d in disks),
+    constants.DT_EXT: sum(d[constants.IDISK_SIZE] for d in disks),
+  }
+
+  if disk_template not in req_size_dict:
+    raise errors.ProgrammerError("Disk template '%s' size requirement"
+                                 " is unknown" % disk_template)
+
+  return req_size_dict[disk_template]