4 # Copyright (C) 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Instance-related functions and classes for masterd.
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import compat
33 from ganeti import utils
34 from ganeti import objects
35 from ganeti import netutils
36 from ganeti import pathutils
39 class _ImportExportError(Exception):
40 """Local exception to report import/export errors.
45 class ImportExportTimeouts(object):
46 #: Time until daemon starts writing status file
47 DEFAULT_READY_TIMEOUT = 10
49 #: Length of time until errors cause hard failure
50 DEFAULT_ERROR_TIMEOUT = 10
52 #: Time after which daemon must be listening
53 DEFAULT_LISTEN_TIMEOUT = 10
55 #: Progress update interval
56 DEFAULT_PROGRESS_INTERVAL = 60
66 def __init__(self, connect,
67 listen=DEFAULT_LISTEN_TIMEOUT,
68 error=DEFAULT_ERROR_TIMEOUT,
69 ready=DEFAULT_READY_TIMEOUT,
70 progress=DEFAULT_PROGRESS_INTERVAL):
71 """Initializes this class.
74 @param connect: Timeout for establishing connection
76 @param listen: Timeout for starting to listen for connections
78 @param error: Length of time until errors cause hard failure
80 @param ready: Timeout for daemon to become ready
81 @type progress: number
82 @param progress: Progress update interval
88 self.connect = connect
89 self.progress = progress
92 class ImportExportCbBase(object):
93 """Callbacks for disk import/export.
96 def ReportListening(self, ie, private, component):
97 """Called when daemon started listening.
99 @type ie: Subclass of L{_DiskImportExportBase}
100 @param ie: Import/export object
101 @param private: Private data passed to import/export object
102 @param component: transfer component name
106 def ReportConnected(self, ie, private):
107 """Called when a connection has been established.
109 @type ie: Subclass of L{_DiskImportExportBase}
110 @param ie: Import/export object
111 @param private: Private data passed to import/export object
115 def ReportProgress(self, ie, private):
116 """Called when new progress information should be reported.
118 @type ie: Subclass of L{_DiskImportExportBase}
119 @param ie: Import/export object
120 @param private: Private data passed to import/export object
124 def ReportFinished(self, ie, private):
125 """Called when a transfer has finished.
127 @type ie: Subclass of L{_DiskImportExportBase}
128 @param ie: Import/export object
129 @param private: Private data passed to import/export object
134 class _DiskImportExportBase(object):
137 def __init__(self, lu, node_uuid, opts,
138 instance, component, timeouts, cbs, private=None):
139 """Initializes this class.
141 @param lu: Logical unit instance
142 @type node_uuid: string
143 @param node_uuid: Node UUID for import
144 @type opts: L{objects.ImportExportOptions}
145 @param opts: Import/export daemon options
146 @type instance: L{objects.Instance}
147 @param instance: Instance object
148 @type component: string
149 @param component: which part of the instance is being imported
150 @type timeouts: L{ImportExportTimeouts}
151 @param timeouts: Timeouts for this import
152 @type cbs: L{ImportExportCbBase}
153 @param cbs: Callbacks
154 @param private: Private data for callback functions
157 assert self.MODE_TEXT
160 self.node_uuid = node_uuid
161 self.node_name = lu.cfg.GetNodeName(node_uuid)
162 self._opts = opts.Copy()
163 self._instance = instance
164 self._component = component
165 self._timeouts = timeouts
167 self._private = private
169 # Set master daemon's timeout in options for import/export daemon
170 assert self._opts.connect_timeout is None
171 self._opts.connect_timeout = timeouts.connect
177 self._ts_begin = None
178 self._ts_connected = None
179 self._ts_finished = None
180 self._ts_cleanup = None
181 self._ts_last_progress = None
182 self._ts_last_error = None
186 self.final_message = None
189 self._daemon_name = None
193 def recent_output(self):
194 """Returns the most recent output from the daemon.
198 return "\n".join(self._daemon.recent_output)
204 """Returns transfer progress information.
210 return (self._daemon.progress_mbytes,
211 self._daemon.progress_throughput,
212 self._daemon.progress_percent,
213 self._daemon.progress_eta)
217 """Returns the magic value for this import/export.
220 return self._opts.magic
224 """Determines whether this transport is still active.
227 return self.success is None
231 """Returns parent loop.
233 @rtype: L{ImportExportLoop}
238 def SetLoop(self, loop):
239 """Sets the parent loop.
241 @type loop: L{ImportExportLoop}
245 raise errors.ProgrammerError("Loop can only be set once")
249 def _StartDaemon(self):
250 """Starts the import/export daemon.
253 raise NotImplementedError()
255 def CheckDaemon(self):
256 """Checks whether daemon has been started and if not, starts it.
262 assert self._ts_cleanup is None
264 if self._daemon_name is None:
265 assert self._ts_begin is None
267 result = self._StartDaemon()
269 raise _ImportExportError("Failed to start %s on %s: %s" %
270 (self.MODE_TEXT, self.node_name,
273 daemon_name = result.payload
275 logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
278 self._ts_begin = time.time()
279 self._daemon_name = daemon_name
281 return self._daemon_name
283 def GetDaemonName(self):
284 """Returns the daemon name.
287 assert self._daemon_name, "Daemon has not been started"
288 assert self._ts_cleanup is None
289 return self._daemon_name
292 """Sends SIGTERM to import/export daemon (if still active).
295 if self._daemon_name:
296 self._lu.LogWarning("Aborting %s '%s' on %s",
297 self.MODE_TEXT, self._daemon_name, self.node_uuid)
298 result = self._lu.rpc.call_impexp_abort(self.node_uuid, self._daemon_name)
300 self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
301 self.MODE_TEXT, self._daemon_name,
302 self.node_uuid, result.fail_msg)
307 def _SetDaemonData(self, data):
308 """Internal function for updating status daemon data.
310 @type data: L{objects.ImportExportStatus}
311 @param data: Daemon status data
314 assert self._ts_begin is not None
317 if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready):
318 raise _ImportExportError("Didn't become ready after %s seconds" %
319 self._timeouts.ready)
327 def SetDaemonData(self, success, data):
328 """Updates daemon status data.
331 @param success: Whether fetching data was successful or not
332 @type data: L{objects.ImportExportStatus}
333 @param data: Daemon status data
337 if self._ts_last_error is None:
338 self._ts_last_error = time.time()
340 elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error):
341 raise _ImportExportError("Too many errors while updating data")
345 self._ts_last_error = None
347 return self._SetDaemonData(data)
349 def CheckListening(self):
350 """Checks whether the daemon is listening.
353 raise NotImplementedError()
355 def _GetConnectedCheckEpoch(self):
356 """Returns timeout to calculate connect timeout.
359 raise NotImplementedError()
361 def CheckConnected(self):
362 """Checks whether the daemon is connected.
365 @return: Whether the daemon is connected
368 assert self._daemon, "Daemon status missing"
370 if self._ts_connected is not None:
373 if self._daemon.connected:
374 self._ts_connected = time.time()
376 # TODO: Log remote peer
377 logging.debug("%s '%s' on %s is now connected",
378 self.MODE_TEXT, self._daemon_name, self.node_uuid)
380 self._cbs.ReportConnected(self, self._private)
384 if utils.TimeoutExpired(self._GetConnectedCheckEpoch(),
385 self._timeouts.connect):
386 raise _ImportExportError("Not connected after %s seconds" %
387 self._timeouts.connect)
391 def _CheckProgress(self):
392 """Checks whether a progress update should be reported.
395 if ((self._ts_last_progress is None or
396 utils.TimeoutExpired(self._ts_last_progress,
397 self._timeouts.progress)) and
399 self._daemon.progress_mbytes is not None and
400 self._daemon.progress_throughput is not None):
401 self._cbs.ReportProgress(self, self._private)
402 self._ts_last_progress = time.time()
404 def CheckFinished(self):
405 """Checks whether the daemon exited.
408 @return: Whether the transfer is finished
411 assert self._daemon, "Daemon status missing"
413 if self._ts_finished:
416 if self._daemon.exit_status is None:
417 # TODO: Adjust delay for ETA expiring soon
418 self._CheckProgress()
421 self._ts_finished = time.time()
423 self._ReportFinished(self._daemon.exit_status == 0,
424 self._daemon.error_message)
428 def _ReportFinished(self, success, message):
429 """Transfer is finished or daemon exited.
432 @param success: Whether the transfer was successful
433 @type message: string
434 @param message: Error message
437 assert self.success is None
439 self.success = success
440 self.final_message = message
443 logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
444 self._daemon_name, self.node_uuid)
445 elif self._daemon_name:
446 self._lu.LogWarning("%s '%s' on %s failed: %s",
447 self.MODE_TEXT, self._daemon_name,
448 self._lu.cfg.GetNodeName(self.node_uuid),
451 self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
452 self._lu.cfg.GetNodeName(self.node_uuid), message)
454 self._cbs.ReportFinished(self, self._private)
457 """Makes the RPC call to finalize this import/export.
460 return self._lu.rpc.call_impexp_cleanup(self.node_uuid, self._daemon_name)
462 def Finalize(self, error=None):
463 """Finalizes this import/export.
466 if self._daemon_name:
467 logging.info("Finalizing %s '%s' on %s",
468 self.MODE_TEXT, self._daemon_name, self.node_uuid)
470 result = self._Finalize()
472 self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
473 self.MODE_TEXT, self._daemon_name,
474 self.node_uuid, result.fail_msg)
477 # Daemon is no longer running
478 self._daemon_name = None
479 self._ts_cleanup = time.time()
482 self._ReportFinished(False, error)
487 class DiskImport(_DiskImportExportBase):
490 def __init__(self, lu, node_uuid, opts, instance, component,
491 dest, dest_args, timeouts, cbs, private=None):
492 """Initializes this class.
494 @param lu: Logical unit instance
495 @type node_uuid: string
496 @param node_uuid: Node name for import
497 @type opts: L{objects.ImportExportOptions}
498 @param opts: Import/export daemon options
499 @type instance: L{objects.Instance}
500 @param instance: Instance object
501 @type component: string
502 @param component: which part of the instance is being imported
503 @param dest: I/O destination
504 @param dest_args: I/O arguments
505 @type timeouts: L{ImportExportTimeouts}
506 @param timeouts: Timeouts for this import
507 @type cbs: L{ImportExportCbBase}
508 @param cbs: Callbacks
509 @param private: Private data for callback functions
512 _DiskImportExportBase.__init__(self, lu, node_uuid, opts, instance,
513 component, timeouts, cbs, private)
515 self._dest_args = dest_args
518 self._ts_listening = None
521 def listen_port(self):
522 """Returns the port the daemon is listening on.
526 return self._daemon.listen_port
530 def _StartDaemon(self):
531 """Starts the import daemon.
534 return self._lu.rpc.call_import_start(self.node_uuid, self._opts,
535 self._instance, self._component,
536 (self._dest, self._dest_args))
538 def CheckListening(self):
539 """Checks whether the daemon is listening.
542 @return: Whether the daemon is listening
545 assert self._daemon, "Daemon status missing"
547 if self._ts_listening is not None:
550 port = self._daemon.listen_port
552 self._ts_listening = time.time()
554 logging.debug("Import '%s' on %s is now listening on port %s",
555 self._daemon_name, self.node_uuid, port)
557 self._cbs.ReportListening(self, self._private, self._component)
561 if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen):
562 raise _ImportExportError("Not listening after %s seconds" %
563 self._timeouts.listen)
567 def _GetConnectedCheckEpoch(self):
568 """Returns the time since we started listening.
571 assert self._ts_listening is not None, \
572 ("Checking whether an import is connected is only useful"
573 " once it's been listening")
575 return self._ts_listening
578 class DiskExport(_DiskImportExportBase):
581 def __init__(self, lu, node_uuid, opts, dest_host, dest_port,
582 instance, component, source, source_args,
583 timeouts, cbs, private=None):
584 """Initializes this class.
586 @param lu: Logical unit instance
587 @type node_uuid: string
588 @param node_uuid: Node UUID for import
589 @type opts: L{objects.ImportExportOptions}
590 @param opts: Import/export daemon options
591 @type dest_host: string
592 @param dest_host: Destination host name or IP address
593 @type dest_port: number
594 @param dest_port: Destination port number
595 @type instance: L{objects.Instance}
596 @param instance: Instance object
597 @type component: string
598 @param component: which part of the instance is being imported
599 @param source: I/O source
600 @param source_args: I/O source
601 @type timeouts: L{ImportExportTimeouts}
602 @param timeouts: Timeouts for this import
603 @type cbs: L{ImportExportCbBase}
604 @param cbs: Callbacks
605 @param private: Private data for callback functions
608 _DiskImportExportBase.__init__(self, lu, node_uuid, opts, instance,
609 component, timeouts, cbs, private)
610 self._dest_host = dest_host
611 self._dest_port = dest_port
612 self._source = source
613 self._source_args = source_args
615 def _StartDaemon(self):
616 """Starts the export daemon.
619 return self._lu.rpc.call_export_start(self.node_uuid, self._opts,
620 self._dest_host, self._dest_port,
621 self._instance, self._component,
622 (self._source, self._source_args))
624 def CheckListening(self):
625 """Checks whether the daemon is listening.
628 # Only an import can be listening
631 def _GetConnectedCheckEpoch(self):
632 """Returns the time since the daemon started.
635 assert self._ts_begin is not None
637 return self._ts_begin
640 def FormatProgress(progress):
641 """Formats progress information for user consumption
644 (mbytes, throughput, percent, eta) = progress
647 utils.FormatUnit(mbytes, "h"),
649 # Not using FormatUnit as it doesn't support kilobytes
650 "%0.1f MiB/s" % throughput,
653 if percent is not None:
654 parts.append("%d%%" % percent)
657 parts.append("ETA %s" % utils.FormatSeconds(eta))
659 return utils.CommaJoin(parts)
662 class ImportExportLoop:
666 def __init__(self, lu):
667 """Initializes this class.
672 self._pending_add = []
674 def Add(self, diskie):
675 """Adds an import/export object to the loop.
677 @type diskie: Subclass of L{_DiskImportExportBase}
678 @param diskie: Import/export object
681 assert diskie not in self._pending_add
682 assert diskie.loop is None
686 # Adding new objects to a staging list is necessary, otherwise the main
687 # loop gets confused if callbacks modify the queue while the main loop is
689 self._pending_add.append(diskie)
692 def _CollectDaemonStatus(lu, daemons):
693 """Collects the status for all import/export daemons.
698 for node_name, names in daemons.iteritems():
699 result = lu.rpc.call_impexp_status(node_name, names)
701 lu.LogWarning("Failed to get daemon status on %s: %s",
702 node_name, result.fail_msg)
705 assert len(names) == len(result.payload)
707 daemon_status[node_name] = dict(zip(names, result.payload))
712 def _GetActiveDaemonNames(queue):
713 """Gets the names of all active daemons.
718 if not diskie.active:
722 # Start daemon if necessary
723 daemon_name = diskie.CheckDaemon()
724 except _ImportExportError, err:
725 logging.exception("%s failed", diskie.MODE_TEXT)
726 diskie.Finalize(error=str(err))
729 result.setdefault(diskie.node_name, []).append(daemon_name)
731 assert len(queue) >= len(result)
732 assert len(queue) >= sum([len(names) for names in result.itervalues()])
734 logging.debug("daemons=%r", result)
738 def _AddPendingToQueue(self):
739 """Adds all pending import/export objects to the internal queue.
742 assert compat.all(diskie not in self._queue and diskie.loop == self
743 for diskie in self._pending_add)
745 self._queue.extend(self._pending_add)
747 del self._pending_add[:]
750 """Utility main loop.
754 self._AddPendingToQueue()
756 # Collect all active daemon names
757 daemons = self._GetActiveDaemonNames(self._queue)
761 # Collection daemon status data
762 data = self._CollectDaemonStatus(self._lu, daemons)
765 delay = self.MAX_DELAY
766 for diskie in self._queue:
767 if not diskie.active:
772 all_daemon_data = data[diskie.node_name]
774 result = diskie.SetDaemonData(False, None)
777 diskie.SetDaemonData(True,
778 all_daemon_data[diskie.GetDaemonName()])
781 # Daemon not yet ready, retry soon
782 delay = min(3.0, delay)
785 if diskie.CheckFinished():
790 # Normal case: check again in 5 seconds
791 delay = min(5.0, delay)
793 if not diskie.CheckListening():
794 # Not yet listening, retry soon
795 delay = min(1.0, delay)
798 if not diskie.CheckConnected():
799 # Not yet connected, retry soon
800 delay = min(1.0, delay)
803 except _ImportExportError, err:
804 logging.exception("%s failed", diskie.MODE_TEXT)
805 diskie.Finalize(error=str(err))
807 if not compat.any(diskie.active for diskie in self._queue):
811 delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
812 logging.debug("Waiting for %ss", delay)
815 def FinalizeAll(self):
816 """Finalizes all pending transfers.
821 for diskie in self._queue:
822 success = diskie.Finalize() and success
827 class _TransferInstCbBase(ImportExportCbBase):
828 def __init__(self, lu, feedback_fn, instance, timeouts, src_node_uuid,
829 src_cbs, dest_node_uuid, dest_ip):
830 """Initializes this class.
833 ImportExportCbBase.__init__(self)
836 self.feedback_fn = feedback_fn
837 self.instance = instance
838 self.timeouts = timeouts
839 self.src_node_uuid = src_node_uuid
840 self.src_cbs = src_cbs
841 self.dest_node_uuid = dest_node_uuid
842 self.dest_ip = dest_ip
845 class _TransferInstSourceCb(_TransferInstCbBase):
846 def ReportConnected(self, ie, dtp):
847 """Called when a connection has been established.
850 assert self.src_cbs is None
851 assert dtp.src_export == ie
852 assert dtp.dest_import
854 self.feedback_fn("%s is sending data on %s" %
855 (dtp.data.name, ie.node_name))
857 def ReportProgress(self, ie, dtp):
858 """Called when new progress information should be reported.
861 progress = ie.progress
865 self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
867 def ReportFinished(self, ie, dtp):
868 """Called when a transfer has finished.
871 assert self.src_cbs is None
872 assert dtp.src_export == ie
873 assert dtp.dest_import
876 self.feedback_fn("%s finished sending data" % dtp.data.name)
878 self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
879 (dtp.data.name, ie.final_message, ie.recent_output))
881 dtp.RecordResult(ie.success)
883 cb = dtp.data.finished_fn
887 # TODO: Check whether sending SIGTERM right away is okay, maybe we should
888 # give the daemon a moment to sort things out
889 if dtp.dest_import and not ie.success:
890 dtp.dest_import.Abort()
893 class _TransferInstDestCb(_TransferInstCbBase):
894 def ReportListening(self, ie, dtp, component):
895 """Called when daemon started listening.
899 assert dtp.src_export is None
900 assert dtp.dest_import
901 assert dtp.export_opts
903 self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
905 # Start export on source node
906 de = DiskExport(self.lu, self.src_node_uuid, dtp.export_opts,
907 self.dest_ip, ie.listen_port, self.instance,
908 component, dtp.data.src_io, dtp.data.src_ioargs,
909 self.timeouts, self.src_cbs, private=dtp)
914 def ReportConnected(self, ie, dtp):
915 """Called when a connection has been established.
918 self.feedback_fn("%s is receiving data on %s" %
920 self.lu.cfg.GetNodeName(self.dest_node_uuid)))
922 def ReportFinished(self, ie, dtp):
923 """Called when a transfer has finished.
927 self.feedback_fn("%s finished receiving data" % dtp.data.name)
929 self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
930 (dtp.data.name, ie.final_message, ie.recent_output))
932 dtp.RecordResult(ie.success)
934 # TODO: Check whether sending SIGTERM right away is okay, maybe we should
935 # give the daemon a moment to sort things out
936 if dtp.src_export and not ie.success:
937 dtp.src_export.Abort()
940 class DiskTransfer(object):
941 def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
943 """Initializes this class.
946 @param name: User-visible name for this transfer (e.g. "disk/0")
947 @param src_io: Source I/O type
948 @param src_ioargs: Source I/O arguments
949 @param dest_io: Destination I/O type
950 @param dest_ioargs: Destination I/O arguments
951 @type finished_fn: callable
952 @param finished_fn: Function called once transfer has finished
958 self.src_ioargs = src_ioargs
960 self.dest_io = dest_io
961 self.dest_ioargs = dest_ioargs
963 self.finished_fn = finished_fn
966 class _DiskTransferPrivate(object):
967 def __init__(self, data, success, export_opts):
968 """Initializes this class.
970 @type data: L{DiskTransfer}
975 self.success = success
976 self.export_opts = export_opts
978 self.src_export = None
979 self.dest_import = None
981 def RecordResult(self, success):
982 """Updates the status.
984 One failed part will cause the whole transfer to fail.
987 self.success = self.success and success
990 def _GetInstDiskMagic(base, instance_name, index):
991 """Computes the magic value for a disk export or import.
994 @param base: Random seed value (can be the same for all disks of a transfer)
995 @type instance_name: string
996 @param instance_name: Name of instance
998 @param index: Disk index
1001 h = compat.sha1_hash()
1002 h.update(str(constants.RIE_VERSION))
1004 h.update(instance_name)
1005 h.update(str(index))
1006 return h.hexdigest()
1009 def TransferInstanceData(lu, feedback_fn, src_node_uuid, dest_node_uuid,
1010 dest_ip, instance, all_transfers):
1011 """Transfers an instance's data from one node to another.
1013 @param lu: Logical unit instance
1014 @param feedback_fn: Feedback function
1015 @type src_node_uuid: string
1016 @param src_node_uuid: Source node UUID
1017 @type dest_node_uuid: string
1018 @param dest_node_uuid: Destination node UUID
1019 @type dest_ip: string
1020 @param dest_ip: IP address of destination node
1021 @type instance: L{objects.Instance}
1022 @param instance: Instance object
1023 @type all_transfers: list of L{DiskTransfer} instances
1024 @param all_transfers: List of all disk transfers to be made
1026 @return: List with a boolean (True=successful, False=failed) for success for
1030 # Disable compression for all moves as these are all within the same cluster
1031 compress = constants.IEC_NONE
1033 src_node_name = lu.cfg.GetNodeName(src_node_uuid)
1034 dest_node_name = lu.cfg.GetNodeName(dest_node_uuid)
1036 logging.debug("Source node %s, destination node %s, compression '%s'",
1037 src_node_name, dest_node_name, compress)
1039 timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1040 src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1041 src_node_uuid, None, dest_node_uuid, dest_ip)
1042 dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1043 src_node_uuid, src_cbs, dest_node_uuid,
1048 base_magic = utils.GenerateSecret(6)
1050 ieloop = ImportExportLoop(lu)
1052 for idx, transfer in enumerate(all_transfers):
1054 feedback_fn("Exporting %s from %s to %s" %
1055 (transfer.name, src_node_name, dest_node_name))
1057 magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1058 opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1059 compress=compress, magic=magic)
1061 dtp = _DiskTransferPrivate(transfer, True, opts)
1063 di = DiskImport(lu, dest_node_uuid, opts, instance, "disk%d" % idx,
1064 transfer.dest_io, transfer.dest_ioargs,
1065 timeouts, dest_cbs, private=dtp)
1068 dtp.dest_import = di
1070 dtp = _DiskTransferPrivate(None, False, None)
1076 ieloop.FinalizeAll()
1078 assert len(all_dtp) == len(all_transfers)
1079 assert compat.all((dtp.src_export is None or
1080 dtp.src_export.success is not None) and
1081 (dtp.dest_import is None or
1082 dtp.dest_import.success is not None)
1083 for dtp in all_dtp), \
1084 "Not all imports/exports are finalized"
1086 return [bool(dtp.success) for dtp in all_dtp]
1089 class _RemoteExportCb(ImportExportCbBase):
1090 def __init__(self, feedback_fn, disk_count):
1091 """Initializes this class.
1094 ImportExportCbBase.__init__(self)
1095 self._feedback_fn = feedback_fn
1096 self._dresults = [None] * disk_count
1099 def disk_results(self):
1100 """Returns per-disk results.
1103 return self._dresults
1105 def ReportConnected(self, ie, private):
1106 """Called when a connection has been established.
1111 self._feedback_fn("Disk %s is now sending data" % idx)
1113 def ReportProgress(self, ie, private):
1114 """Called when new progress information should be reported.
1119 progress = ie.progress
1123 self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1125 def ReportFinished(self, ie, private):
1126 """Called when a transfer has finished.
1129 (idx, finished_fn) = private
1132 self._feedback_fn("Disk %s finished sending data" % idx)
1134 self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1135 (idx, ie.final_message, ie.recent_output))
1137 self._dresults[idx] = bool(ie.success)
1143 class ExportInstanceHelper:
1144 def __init__(self, lu, feedback_fn, instance):
1145 """Initializes this class.
1147 @param lu: Logical unit instance
1148 @param feedback_fn: Feedback function
1149 @type instance: L{objects.Instance}
1150 @param instance: Instance object
1154 self._feedback_fn = feedback_fn
1155 self._instance = instance
1157 self._snap_disks = []
1158 self._removed_snaps = [False] * len(instance.disks)
1160 def CreateSnapshots(self):
1161 """Creates an LVM snapshot for every disk of the instance.
1164 assert not self._snap_disks
1166 instance = self._instance
1167 src_node = instance.primary_node
1169 for idx, disk in enumerate(instance.disks):
1170 self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1173 # result.payload will be a snapshot of an lvm leaf of the one we
1175 result = self._lu.rpc.call_blockdev_snapshot(src_node, (disk, instance))
1177 msg = result.fail_msg
1179 self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1181 elif (not isinstance(result.payload, (tuple, list)) or
1182 len(result.payload) != 2):
1183 self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1184 " result '%s'", idx, src_node, result.payload)
1186 disk_id = tuple(result.payload)
1187 disk_params = constants.DISK_DT_DEFAULTS[constants.DT_PLAIN].copy()
1188 new_dev = objects.Disk(dev_type=constants.DT_PLAIN, size=disk.size,
1189 logical_id=disk_id, physical_id=disk_id,
1190 iv_name=disk.iv_name,
1193 self._snap_disks.append(new_dev)
1195 assert len(self._snap_disks) == len(instance.disks)
1196 assert len(self._removed_snaps) == len(instance.disks)
1198 def _RemoveSnapshot(self, disk_index):
1199 """Removes an LVM snapshot.
1201 @type disk_index: number
1202 @param disk_index: Index of the snapshot to be removed
1205 disk = self._snap_disks[disk_index]
1206 if disk and not self._removed_snaps[disk_index]:
1207 src_node = self._instance.primary_node
1209 self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1210 (disk_index, src_node))
1212 result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1214 self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1215 " %s: %s", disk_index, src_node, result.fail_msg)
1217 self._removed_snaps[disk_index] = True
1219 def LocalExport(self, dest_node):
1220 """Intra-cluster instance export.
1222 @type dest_node: L{objects.Node}
1223 @param dest_node: Destination node
1226 instance = self._instance
1227 src_node_uuid = instance.primary_node
1229 assert len(self._snap_disks) == len(instance.disks)
1233 for idx, dev in enumerate(self._snap_disks):
1235 transfers.append(None)
1238 path = utils.PathJoin(pathutils.EXPORT_DIR, "%s.new" % instance.name,
1241 finished_fn = compat.partial(self._TransferFinished, idx)
1243 # FIXME: pass debug option from opcode to backend
1244 dt = DiskTransfer("snapshot/%s" % idx,
1245 constants.IEIO_SCRIPT, (dev, idx),
1246 constants.IEIO_FILE, (path, ),
1248 transfers.append(dt)
1250 # Actually export data
1251 dresults = TransferInstanceData(self._lu, self._feedback_fn,
1252 src_node_uuid, dest_node.uuid,
1253 dest_node.secondary_ip,
1254 instance, transfers)
1256 assert len(dresults) == len(instance.disks)
1258 self._feedback_fn("Finalizing export on %s" % dest_node.name)
1259 result = self._lu.rpc.call_finalize_export(dest_node.uuid, instance,
1261 msg = result.fail_msg
1264 self._lu.LogWarning("Could not finalize export for instance %s"
1265 " on node %s: %s", instance.name, dest_node.name, msg)
1267 return (fin_resu, dresults)
1269 def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1270 """Inter-cluster instance export.
1272 @type disk_info: list
1273 @param disk_info: Per-disk destination information
1274 @type key_name: string
1275 @param key_name: Name of X509 key to use
1276 @type dest_ca_pem: string
1277 @param dest_ca_pem: Destination X509 CA in PEM format
1278 @type timeouts: L{ImportExportTimeouts}
1279 @param timeouts: Timeouts for this import
1282 instance = self._instance
1284 assert len(disk_info) == len(instance.disks)
1286 cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1288 ieloop = ImportExportLoop(self._lu)
1290 for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1292 # Decide whether to use IPv6
1293 ipv6 = netutils.IP6Address.IsValid(host)
1295 opts = objects.ImportExportOptions(key_name=key_name,
1297 magic=magic, ipv6=ipv6)
1299 self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1300 finished_fn = compat.partial(self._TransferFinished, idx)
1301 ieloop.Add(DiskExport(self._lu, instance.primary_node,
1302 opts, host, port, instance, "disk%d" % idx,
1303 constants.IEIO_SCRIPT, (dev, idx),
1304 timeouts, cbs, private=(idx, finished_fn)))
1308 ieloop.FinalizeAll()
1310 return (True, cbs.disk_results)
1312 def _TransferFinished(self, idx):
1313 """Called once a transfer has finished.
1316 @param idx: Disk index
1319 logging.debug("Transfer %s finished", idx)
1320 self._RemoveSnapshot(idx)
1323 """Remove all snapshots.
1326 assert len(self._removed_snaps) == len(self._instance.disks)
1327 for idx in range(len(self._instance.disks)):
1328 self._RemoveSnapshot(idx)
1331 class _RemoteImportCb(ImportExportCbBase):
1332 def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1334 """Initializes this class.
1337 @param cds: Cluster domain secret
1338 @type x509_cert_pem: string
1339 @param x509_cert_pem: CA used for signing import key
1340 @type disk_count: number
1341 @param disk_count: Number of disks
1342 @type external_address: string
1343 @param external_address: External address of destination node
1346 ImportExportCbBase.__init__(self)
1347 self._feedback_fn = feedback_fn
1349 self._x509_cert_pem = x509_cert_pem
1350 self._disk_count = disk_count
1351 self._external_address = external_address
1353 self._dresults = [None] * disk_count
1354 self._daemon_port = [None] * disk_count
1356 self._salt = utils.GenerateSecret(8)
1359 def disk_results(self):
1360 """Returns per-disk results.
1363 return self._dresults
1365 def _CheckAllListening(self):
1366 """Checks whether all daemons are listening.
1368 If all daemons are listening, the information is sent to the client.
1371 if not compat.all(dp is not None for dp in self._daemon_port):
1374 host = self._external_address
1377 for idx, (port, magic) in enumerate(self._daemon_port):
1378 disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1379 idx, host, port, magic))
1381 assert len(disks) == self._disk_count
1383 self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1385 "x509_ca": self._x509_cert_pem,
1388 def ReportListening(self, ie, private, _):
1389 """Called when daemon started listening.
1394 self._feedback_fn("Disk %s is now listening" % idx)
1396 assert self._daemon_port[idx] is None
1398 self._daemon_port[idx] = (ie.listen_port, ie.magic)
1400 self._CheckAllListening()
1402 def ReportConnected(self, ie, private):
1403 """Called when a connection has been established.
1408 self._feedback_fn("Disk %s is now receiving data" % idx)
1410 def ReportFinished(self, ie, private):
1411 """Called when a transfer has finished.
1416 # Daemon is certainly no longer listening
1417 self._daemon_port[idx] = None
1420 self._feedback_fn("Disk %s finished receiving data" % idx)
1422 self._feedback_fn(("Disk %s failed to receive data: %s"
1423 " (recent output: %s)") %
1424 (idx, ie.final_message, ie.recent_output))
1426 self._dresults[idx] = bool(ie.success)
1429 def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1431 """Imports an instance from another cluster.
1433 @param lu: Logical unit instance
1434 @param feedback_fn: Feedback function
1435 @type instance: L{objects.Instance}
1436 @param instance: Instance object
1437 @type pnode: L{objects.Node}
1438 @param pnode: Primary node of instance as an object
1439 @type source_x509_ca: OpenSSL.crypto.X509
1440 @param source_x509_ca: Import source's X509 CA
1442 @param cds: Cluster domain secret
1443 @type timeouts: L{ImportExportTimeouts}
1444 @param timeouts: Timeouts for this import
1447 source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1450 magic_base = utils.GenerateSecret(6)
1452 # Decide whether to use IPv6
1453 ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1456 result = lu.rpc.call_x509_cert_create(instance.primary_node,
1457 constants.RIE_CERT_VALIDITY)
1458 result.Raise("Can't create X509 key and certificate on %s" % result.node)
1460 (x509_key_name, x509_cert_pem) = result.payload
1463 x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1467 signed_x509_cert_pem = \
1468 utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1470 cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1471 len(instance.disks), pnode.primary_ip)
1473 ieloop = ImportExportLoop(lu)
1475 for idx, dev in enumerate(instance.disks):
1476 magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1478 # Import daemon options
1479 opts = objects.ImportExportOptions(key_name=x509_key_name,
1480 ca_pem=source_ca_pem,
1481 magic=magic, ipv6=ipv6)
1483 ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1485 constants.IEIO_SCRIPT, (dev, idx),
1486 timeouts, cbs, private=(idx, )))
1490 ieloop.FinalizeAll()
1492 # Remove crypto key and certificate
1493 result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1494 result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1496 return cbs.disk_results
1499 def _GetImportExportHandshakeMessage(version):
1500 """Returns the handshake message for a RIE protocol version.
1502 @type version: number
1505 return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1508 def ComputeRemoteExportHandshake(cds):
1509 """Computes the remote import/export handshake.
1512 @param cds: Cluster domain secret
1515 salt = utils.GenerateSecret(8)
1516 msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1517 return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1520 def CheckRemoteExportHandshake(cds, handshake):
1521 """Checks the handshake of a remote import/export.
1524 @param cds: Cluster domain secret
1525 @type handshake: sequence
1526 @param handshake: Handshake sent by remote peer
1530 (version, hmac_digest, hmac_salt) = handshake
1531 except (TypeError, ValueError), err:
1532 return "Invalid data: %s" % err
1534 if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1535 hmac_digest, salt=hmac_salt):
1536 return "Hash didn't match, clusters don't share the same domain secret"
1538 if version != constants.RIE_VERSION:
1539 return ("Clusters don't have the same remote import/export protocol"
1540 " (local=%s, remote=%s)" %
1541 (constants.RIE_VERSION, version))
1546 def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1547 """Returns the hashed text for import/export disk information.
1549 @type disk_index: number
1550 @param disk_index: Index of disk (included in hash)
1552 @param host: Hostname
1554 @param port: Daemon port
1556 @param magic: Magic value
1559 return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1562 def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1563 """Verifies received disk information for an export.
1566 @param cds: Cluster domain secret
1567 @type disk_index: number
1568 @param disk_index: Index of disk (included in hash)
1569 @type disk_info: sequence
1570 @param disk_info: Disk information sent by remote peer
1574 (host, port, magic, hmac_digest, hmac_salt) = disk_info
1575 except (TypeError, ValueError), err:
1576 raise errors.GenericError("Invalid data: %s" % err)
1578 if not (host and port and magic):
1579 raise errors.GenericError("Missing destination host, port or magic")
1581 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1583 if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1584 raise errors.GenericError("HMAC is wrong")
1586 if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1589 destination = netutils.Hostname.GetNormalizedName(host)
1591 return (destination,
1592 utils.ValidateServiceName(port),
1596 def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1597 """Computes the signed disk information for a remote import.
1600 @param cds: Cluster domain secret
1602 @param salt: HMAC salt
1603 @type disk_index: number
1604 @param disk_index: Index of disk (included in hash)
1606 @param host: Hostname
1608 @param port: Daemon port
1610 @param magic: Magic value
1613 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1614 hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1615 return (host, port, magic, hmac_digest, salt)
1618 def CalculateGroupIPolicy(cluster, group):
1619 """Calculate instance policy for group.
1622 return cluster.SimpleFillIPolicy(group.ipolicy)
1625 def ComputeDiskSize(disk_template, disks):
1626 """Compute disk size requirements according to disk template
1629 # Required free disk space as a function of disk and swap space
1631 constants.DT_DISKLESS: 0,
1632 constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
1633 # 128 MB are added for drbd metadata for each disk
1635 sum(d[constants.IDISK_SIZE] + constants.DRBD_META_SIZE for d in disks),
1636 constants.DT_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
1637 constants.DT_SHARED_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
1638 constants.DT_BLOCK: 0,
1639 constants.DT_RBD: sum(d[constants.IDISK_SIZE] for d in disks),
1640 constants.DT_EXT: sum(d[constants.IDISK_SIZE] for d in disks),
1643 if disk_template not in req_size_dict:
1644 raise errors.ProgrammerError("Disk template '%s' size requirement"
1645 " is unknown" % disk_template)
1647 return req_size_dict[disk_template]