4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Instance-related functions and classes for masterd.
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import compat
33 from ganeti import utils
34 from ganeti import objects
35 from ganeti import netutils
38 class _ImportExportError(Exception):
39 """Local exception to report import/export errors.
44 class ImportExportTimeouts(object):
45 #: Time until daemon starts writing status file
46 DEFAULT_READY_TIMEOUT = 10
48 #: Length of time until errors cause hard failure
49 DEFAULT_ERROR_TIMEOUT = 10
51 #: Time after which daemon must be listening
52 DEFAULT_LISTEN_TIMEOUT = 10
54 #: Progress update interval
55 DEFAULT_PROGRESS_INTERVAL = 60
65 def __init__(self, connect,
66 listen=DEFAULT_LISTEN_TIMEOUT,
67 error=DEFAULT_ERROR_TIMEOUT,
68 ready=DEFAULT_READY_TIMEOUT,
69 progress=DEFAULT_PROGRESS_INTERVAL):
70 """Initializes this class.
73 @param connect: Timeout for establishing connection
75 @param listen: Timeout for starting to listen for connections
77 @param error: Length of time until errors cause hard failure
79 @param ready: Timeout for daemon to become ready
80 @type progress: number
81 @param progress: Progress update interval
87 self.connect = connect
88 self.progress = progress
91 class ImportExportCbBase(object):
92 """Callbacks for disk import/export.
95 def ReportListening(self, ie, private):
96 """Called when daemon started listening.
98 @type ie: Subclass of L{_DiskImportExportBase}
99 @param ie: Import/export object
100 @param private: Private data passed to import/export object
104 def ReportConnected(self, ie, private):
105 """Called when a connection has been established.
107 @type ie: Subclass of L{_DiskImportExportBase}
108 @param ie: Import/export object
109 @param private: Private data passed to import/export object
113 def ReportProgress(self, ie, private):
114 """Called when new progress information should be reported.
116 @type ie: Subclass of L{_DiskImportExportBase}
117 @param ie: Import/export object
118 @param private: Private data passed to import/export object
122 def ReportFinished(self, ie, private):
123 """Called when a transfer has finished.
125 @type ie: Subclass of L{_DiskImportExportBase}
126 @param ie: Import/export object
127 @param private: Private data passed to import/export object
132 def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
133 """Checks whether a timeout has expired.
136 return _time_fn() > (epoch + timeout)
139 class _DiskImportExportBase(object):
142 def __init__(self, lu, node_name, opts,
143 instance, timeouts, cbs, private=None):
144 """Initializes this class.
146 @param lu: Logical unit instance
147 @type node_name: string
148 @param node_name: Node name for import
149 @type opts: L{objects.ImportExportOptions}
150 @param opts: Import/export daemon options
151 @type instance: L{objects.Instance}
152 @param instance: Instance object
153 @type timeouts: L{ImportExportTimeouts}
154 @param timeouts: Timeouts for this import
155 @type cbs: L{ImportExportCbBase}
156 @param cbs: Callbacks
157 @param private: Private data for callback functions
160 assert self.MODE_TEXT
163 self.node_name = node_name
164 self._opts = opts.Copy()
165 self._instance = instance
166 self._timeouts = timeouts
168 self._private = private
170 # Set master daemon's timeout in options for import/export daemon
171 assert self._opts.connect_timeout is None
172 self._opts.connect_timeout = timeouts.connect
178 self._ts_begin = None
179 self._ts_connected = None
180 self._ts_finished = None
181 self._ts_cleanup = None
182 self._ts_last_progress = None
183 self._ts_last_error = None
187 self.final_message = None
190 self._daemon_name = None
194 def recent_output(self):
195 """Returns the most recent output from the daemon.
199 return self._daemon.recent_output
205 """Returns transfer progress information.
211 return (self._daemon.progress_mbytes,
212 self._daemon.progress_throughput,
213 self._daemon.progress_percent,
214 self._daemon.progress_eta)
218 """Returns the magic value for this import/export.
221 return self._opts.magic
225 """Determines whether this transport is still active.
228 return self.success is None
232 """Returns parent loop.
234 @rtype: L{ImportExportLoop}
239 def SetLoop(self, loop):
240 """Sets the parent loop.
242 @type loop: L{ImportExportLoop}
246 raise errors.ProgrammerError("Loop can only be set once")
250 def _StartDaemon(self):
251 """Starts the import/export daemon.
254 raise NotImplementedError()
256 def CheckDaemon(self):
257 """Checks whether daemon has been started and if not, starts it.
263 assert self._ts_cleanup is None
265 if self._daemon_name is None:
266 assert self._ts_begin is None
268 result = self._StartDaemon()
270 raise _ImportExportError("Failed to start %s on %s: %s" %
271 (self.MODE_TEXT, self.node_name,
274 daemon_name = result.payload
276 logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
279 self._ts_begin = time.time()
280 self._daemon_name = daemon_name
282 return self._daemon_name
284 def GetDaemonName(self):
285 """Returns the daemon name.
288 assert self._daemon_name, "Daemon has not been started"
289 assert self._ts_cleanup is None
290 return self._daemon_name
293 """Sends SIGTERM to import/export daemon (if still active).
296 if self._daemon_name:
297 self._lu.LogWarning("Aborting %s %r on %s",
298 self.MODE_TEXT, self._daemon_name, self.node_name)
299 result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
301 self._lu.LogWarning("Failed to abort %s %r on %s: %s",
302 self.MODE_TEXT, self._daemon_name,
303 self.node_name, result.fail_msg)
308 def _SetDaemonData(self, data):
309 """Internal function for updating status daemon data.
311 @type data: L{objects.ImportExportStatus}
312 @param data: Daemon status data
315 assert self._ts_begin is not None
318 if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
319 raise _ImportExportError("Didn't become ready after %s seconds" %
320 self._timeouts.ready)
328 def SetDaemonData(self, success, data):
329 """Updates daemon status data.
332 @param success: Whether fetching data was successful or not
333 @type data: L{objects.ImportExportStatus}
334 @param data: Daemon status data
338 if self._ts_last_error is None:
339 self._ts_last_error = time.time()
341 elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
342 raise _ImportExportError("Too many errors while updating data")
346 self._ts_last_error = None
348 return self._SetDaemonData(data)
350 def CheckListening(self):
351 """Checks whether the daemon is listening.
354 raise NotImplementedError()
356 def _GetConnectedCheckEpoch(self):
357 """Returns timeout to calculate connect timeout.
360 raise NotImplementedError()
362 def CheckConnected(self):
363 """Checks whether the daemon is connected.
366 @return: Whether the daemon is connected
369 assert self._daemon, "Daemon status missing"
371 if self._ts_connected is not None:
374 if self._daemon.connected:
375 self._ts_connected = time.time()
377 # TODO: Log remote peer
378 logging.debug("%s %r on %s is now connected",
379 self.MODE_TEXT, self._daemon_name, self.node_name)
381 self._cbs.ReportConnected(self, self._private)
385 if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
386 raise _ImportExportError("Not connected after %s seconds" %
387 self._timeouts.connect)
391 def _CheckProgress(self):
392 """Checks whether a progress update should be reported.
395 if ((self._ts_last_progress is None or
396 _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
398 self._daemon.progress_mbytes is not None and
399 self._daemon.progress_throughput is not None):
400 self._cbs.ReportProgress(self, self._private)
401 self._ts_last_progress = time.time()
403 def CheckFinished(self):
404 """Checks whether the daemon exited.
407 @return: Whether the transfer is finished
410 assert self._daemon, "Daemon status missing"
412 if self._ts_finished:
415 if self._daemon.exit_status is None:
416 # TODO: Adjust delay for ETA expiring soon
417 self._CheckProgress()
420 self._ts_finished = time.time()
422 self._ReportFinished(self._daemon.exit_status == 0,
423 self._daemon.error_message)
427 def _ReportFinished(self, success, message):
428 """Transfer is finished or daemon exited.
431 @param success: Whether the transfer was successful
432 @type message: string
433 @param message: Error message
436 assert self.success is None
438 self.success = success
439 self.final_message = message
442 logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
444 elif self._daemon_name:
445 self._lu.LogWarning("%s %r on %s failed: %s",
446 self.MODE_TEXT, self._daemon_name, self.node_name,
449 self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
450 self.node_name, message)
452 self._cbs.ReportFinished(self, self._private)
455 """Makes the RPC call to finalize this import/export.
458 return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
460 def Finalize(self, error=None):
461 """Finalizes this import/export.
464 if self._daemon_name:
465 logging.info("Finalizing %s %r on %s",
466 self.MODE_TEXT, self._daemon_name, self.node_name)
468 result = self._Finalize()
470 self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
471 self.MODE_TEXT, self._daemon_name,
472 self.node_name, result.fail_msg)
475 # Daemon is no longer running
476 self._daemon_name = None
477 self._ts_cleanup = time.time()
480 self._ReportFinished(False, error)
485 class DiskImport(_DiskImportExportBase):
488 def __init__(self, lu, node_name, opts, instance,
489 dest, dest_args, timeouts, cbs, private=None):
490 """Initializes this class.
492 @param lu: Logical unit instance
493 @type node_name: string
494 @param node_name: Node name for import
495 @type opts: L{objects.ImportExportOptions}
496 @param opts: Import/export daemon options
497 @type instance: L{objects.Instance}
498 @param instance: Instance object
499 @param dest: I/O destination
500 @param dest_args: I/O arguments
501 @type timeouts: L{ImportExportTimeouts}
502 @param timeouts: Timeouts for this import
503 @type cbs: L{ImportExportCbBase}
504 @param cbs: Callbacks
505 @param private: Private data for callback functions
508 _DiskImportExportBase.__init__(self, lu, node_name, opts,
509 instance, timeouts, cbs, private)
511 self._dest_args = dest_args
514 self._ts_listening = None
517 def listen_port(self):
518 """Returns the port the daemon is listening on.
522 return self._daemon.listen_port
526 def _StartDaemon(self):
527 """Starts the import daemon.
530 return self._lu.rpc.call_import_start(self.node_name, self._opts,
532 self._dest, self._dest_args)
534 def CheckListening(self):
535 """Checks whether the daemon is listening.
538 @return: Whether the daemon is listening
541 assert self._daemon, "Daemon status missing"
543 if self._ts_listening is not None:
546 port = self._daemon.listen_port
548 self._ts_listening = time.time()
550 logging.debug("Import %r on %s is now listening on port %s",
551 self._daemon_name, self.node_name, port)
553 self._cbs.ReportListening(self, self._private)
557 if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
558 raise _ImportExportError("Not listening after %s seconds" %
559 self._timeouts.listen)
563 def _GetConnectedCheckEpoch(self):
564 """Returns the time since we started listening.
567 assert self._ts_listening is not None, \
568 ("Checking whether an import is connected is only useful"
569 " once it's been listening")
571 return self._ts_listening
574 class DiskExport(_DiskImportExportBase):
577 def __init__(self, lu, node_name, opts,
578 dest_host, dest_port, instance, source, source_args,
579 timeouts, cbs, private=None):
580 """Initializes this class.
582 @param lu: Logical unit instance
583 @type node_name: string
584 @param node_name: Node name for import
585 @type opts: L{objects.ImportExportOptions}
586 @param opts: Import/export daemon options
587 @type dest_host: string
588 @param dest_host: Destination host name or IP address
589 @type dest_port: number
590 @param dest_port: Destination port number
591 @type instance: L{objects.Instance}
592 @param instance: Instance object
593 @param source: I/O source
594 @param source_args: I/O source
595 @type timeouts: L{ImportExportTimeouts}
596 @param timeouts: Timeouts for this import
597 @type cbs: L{ImportExportCbBase}
598 @param cbs: Callbacks
599 @param private: Private data for callback functions
602 _DiskImportExportBase.__init__(self, lu, node_name, opts,
603 instance, timeouts, cbs, private)
604 self._dest_host = dest_host
605 self._dest_port = dest_port
606 self._source = source
607 self._source_args = source_args
609 def _StartDaemon(self):
610 """Starts the export daemon.
613 return self._lu.rpc.call_export_start(self.node_name, self._opts,
614 self._dest_host, self._dest_port,
615 self._instance, self._source,
618 def CheckListening(self):
619 """Checks whether the daemon is listening.
622 # Only an import can be listening
625 def _GetConnectedCheckEpoch(self):
626 """Returns the time since the daemon started.
629 assert self._ts_begin is not None
631 return self._ts_begin
634 def FormatProgress(progress):
635 """Formats progress information for user consumption
638 (mbytes, throughput, percent, eta) = progress
641 utils.FormatUnit(mbytes, "h"),
643 # Not using FormatUnit as it doesn't support kilobytes
644 "%0.1f MiB/s" % throughput,
647 if percent is not None:
648 parts.append("%d%%" % percent)
651 parts.append("ETA %s" % utils.FormatSeconds(eta))
653 return utils.CommaJoin(parts)
656 class ImportExportLoop:
660 def __init__(self, lu):
661 """Initializes this class.
666 self._pending_add = []
668 def Add(self, diskie):
669 """Adds an import/export object to the loop.
671 @type diskie: Subclass of L{_DiskImportExportBase}
672 @param diskie: Import/export object
675 assert diskie not in self._pending_add
676 assert diskie.loop is None
680 # Adding new objects to a staging list is necessary, otherwise the main
681 # loop gets confused if callbacks modify the queue while the main loop is
683 self._pending_add.append(diskie)
686 def _CollectDaemonStatus(lu, daemons):
687 """Collects the status for all import/export daemons.
692 for node_name, names in daemons.iteritems():
693 result = lu.rpc.call_impexp_status(node_name, names)
695 lu.LogWarning("Failed to get daemon status on %s: %s",
696 node_name, result.fail_msg)
699 assert len(names) == len(result.payload)
701 daemon_status[node_name] = dict(zip(names, result.payload))
706 def _GetActiveDaemonNames(queue):
707 """Gets the names of all active daemons.
712 if not diskie.active:
716 # Start daemon if necessary
717 daemon_name = diskie.CheckDaemon()
718 except _ImportExportError, err:
719 logging.exception("%s failed", diskie.MODE_TEXT)
720 diskie.Finalize(error=str(err))
723 result.setdefault(diskie.node_name, []).append(daemon_name)
725 assert len(queue) >= len(result)
726 assert len(queue) >= sum([len(names) for names in result.itervalues()])
728 logging.debug("daemons=%r", result)
732 def _AddPendingToQueue(self):
733 """Adds all pending import/export objects to the internal queue.
736 assert compat.all(diskie not in self._queue and diskie.loop == self
737 for diskie in self._pending_add)
739 self._queue.extend(self._pending_add)
741 del self._pending_add[:]
744 """Utility main loop.
748 self._AddPendingToQueue()
750 # Collect all active daemon names
751 daemons = self._GetActiveDaemonNames(self._queue)
755 # Collection daemon status data
756 data = self._CollectDaemonStatus(self._lu, daemons)
759 delay = self.MAX_DELAY
760 for diskie in self._queue:
761 if not diskie.active:
766 all_daemon_data = data[diskie.node_name]
768 result = diskie.SetDaemonData(False, None)
771 diskie.SetDaemonData(True,
772 all_daemon_data[diskie.GetDaemonName()])
775 # Daemon not yet ready, retry soon
776 delay = min(3.0, delay)
779 if diskie.CheckFinished():
784 # Normal case: check again in 5 seconds
785 delay = min(5.0, delay)
787 if not diskie.CheckListening():
788 # Not yet listening, retry soon
789 delay = min(1.0, delay)
792 if not diskie.CheckConnected():
793 # Not yet connected, retry soon
794 delay = min(1.0, delay)
797 except _ImportExportError, err:
798 logging.exception("%s failed", diskie.MODE_TEXT)
799 diskie.Finalize(error=str(err))
801 if not compat.any(diskie.active for diskie in self._queue):
805 delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
806 logging.debug("Waiting for %ss", delay)
809 def FinalizeAll(self):
810 """Finalizes all pending transfers.
815 for diskie in self._queue:
816 success = diskie.Finalize() and success
821 class _TransferInstCbBase(ImportExportCbBase):
822 def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
824 """Initializes this class.
827 ImportExportCbBase.__init__(self)
830 self.feedback_fn = feedback_fn
831 self.instance = instance
832 self.timeouts = timeouts
833 self.src_node = src_node
834 self.src_cbs = src_cbs
835 self.dest_node = dest_node
836 self.dest_ip = dest_ip
839 class _TransferInstSourceCb(_TransferInstCbBase):
840 def ReportConnected(self, ie, dtp):
841 """Called when a connection has been established.
844 assert self.src_cbs is None
845 assert dtp.src_export == ie
846 assert dtp.dest_import
848 self.feedback_fn("%s is sending data on %s" %
849 (dtp.data.name, ie.node_name))
851 def ReportProgress(self, ie, dtp):
852 """Called when new progress information should be reported.
855 progress = ie.progress
859 self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
861 def ReportFinished(self, ie, dtp):
862 """Called when a transfer has finished.
865 assert self.src_cbs is None
866 assert dtp.src_export == ie
867 assert dtp.dest_import
870 self.feedback_fn("%s finished sending data" % dtp.data.name)
872 self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
873 (dtp.data.name, ie.final_message, ie.recent_output))
875 dtp.RecordResult(ie.success)
877 cb = dtp.data.finished_fn
881 # TODO: Check whether sending SIGTERM right away is okay, maybe we should
882 # give the daemon a moment to sort things out
883 if dtp.dest_import and not ie.success:
884 dtp.dest_import.Abort()
887 class _TransferInstDestCb(_TransferInstCbBase):
888 def ReportListening(self, ie, dtp):
889 """Called when daemon started listening.
893 assert dtp.src_export is None
894 assert dtp.dest_import
895 assert dtp.export_opts
897 self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
899 # Start export on source node
900 de = DiskExport(self.lu, self.src_node, dtp.export_opts,
901 self.dest_ip, ie.listen_port,
902 self.instance, dtp.data.src_io, dtp.data.src_ioargs,
903 self.timeouts, self.src_cbs, private=dtp)
908 def ReportConnected(self, ie, dtp):
909 """Called when a connection has been established.
912 self.feedback_fn("%s is receiving data on %s" %
913 (dtp.data.name, self.dest_node))
915 def ReportFinished(self, ie, dtp):
916 """Called when a transfer has finished.
920 self.feedback_fn("%s finished receiving data" % dtp.data.name)
922 self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
923 (dtp.data.name, ie.final_message, ie.recent_output))
925 dtp.RecordResult(ie.success)
927 # TODO: Check whether sending SIGTERM right away is okay, maybe we should
928 # give the daemon a moment to sort things out
929 if dtp.src_export and not ie.success:
930 dtp.src_export.Abort()
933 class DiskTransfer(object):
934 def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
936 """Initializes this class.
939 @param name: User-visible name for this transfer (e.g. "disk/0")
940 @param src_io: Source I/O type
941 @param src_ioargs: Source I/O arguments
942 @param dest_io: Destination I/O type
943 @param dest_ioargs: Destination I/O arguments
944 @type finished_fn: callable
945 @param finished_fn: Function called once transfer has finished
951 self.src_ioargs = src_ioargs
953 self.dest_io = dest_io
954 self.dest_ioargs = dest_ioargs
956 self.finished_fn = finished_fn
959 class _DiskTransferPrivate(object):
960 def __init__(self, data, success, export_opts):
961 """Initializes this class.
963 @type data: L{DiskTransfer}
968 self.success = success
969 self.export_opts = export_opts
971 self.src_export = None
972 self.dest_import = None
974 def RecordResult(self, success):
975 """Updates the status.
977 One failed part will cause the whole transfer to fail.
980 self.success = self.success and success
983 def _GetInstDiskMagic(base, instance_name, index):
984 """Computes the magic value for a disk export or import.
987 @param base: Random seed value (can be the same for all disks of a transfer)
988 @type instance_name: string
989 @param instance_name: Name of instance
991 @param index: Disk index
994 h = compat.sha1_hash()
995 h.update(str(constants.RIE_VERSION))
997 h.update(instance_name)
1002 def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
1003 instance, all_transfers):
1004 """Transfers an instance's data from one node to another.
1006 @param lu: Logical unit instance
1007 @param feedback_fn: Feedback function
1008 @type src_node: string
1009 @param src_node: Source node name
1010 @type dest_node: string
1011 @param dest_node: Destination node name
1012 @type dest_ip: string
1013 @param dest_ip: IP address of destination node
1014 @type instance: L{objects.Instance}
1015 @param instance: Instance object
1016 @type all_transfers: list of L{DiskTransfer} instances
1017 @param all_transfers: List of all disk transfers to be made
1019 @return: List with a boolean (True=successful, False=failed) for success for
1023 # Disable compression for all moves as these are all within the same cluster
1024 compress = constants.IEC_NONE
1026 logging.debug("Source node %s, destination node %s, compression '%s'",
1027 src_node, dest_node, compress)
1029 timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1030 src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1031 src_node, None, dest_node, dest_ip)
1032 dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1033 src_node, src_cbs, dest_node, dest_ip)
1037 base_magic = utils.GenerateSecret(6)
1039 ieloop = ImportExportLoop(lu)
1041 for idx, transfer in enumerate(all_transfers):
1043 feedback_fn("Exporting %s from %s to %s" %
1044 (transfer.name, src_node, dest_node))
1046 magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1047 opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1048 compress=compress, magic=magic)
1050 dtp = _DiskTransferPrivate(transfer, True, opts)
1052 di = DiskImport(lu, dest_node, opts, instance,
1053 transfer.dest_io, transfer.dest_ioargs,
1054 timeouts, dest_cbs, private=dtp)
1057 dtp.dest_import = di
1059 dtp = _DiskTransferPrivate(None, False, None)
1065 ieloop.FinalizeAll()
1067 assert len(all_dtp) == len(all_transfers)
1068 assert compat.all((dtp.src_export is None or
1069 dtp.src_export.success is not None) and
1070 (dtp.dest_import is None or
1071 dtp.dest_import.success is not None)
1072 for dtp in all_dtp), \
1073 "Not all imports/exports are finalized"
1075 return [bool(dtp.success) for dtp in all_dtp]
1078 class _RemoteExportCb(ImportExportCbBase):
1079 def __init__(self, feedback_fn, disk_count):
1080 """Initializes this class.
1083 ImportExportCbBase.__init__(self)
1084 self._feedback_fn = feedback_fn
1085 self._dresults = [None] * disk_count
1088 def disk_results(self):
1089 """Returns per-disk results.
1092 return self._dresults
1094 def ReportConnected(self, ie, private):
1095 """Called when a connection has been established.
1100 self._feedback_fn("Disk %s is now sending data" % idx)
1102 def ReportProgress(self, ie, private):
1103 """Called when new progress information should be reported.
1108 progress = ie.progress
1112 self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1114 def ReportFinished(self, ie, private):
1115 """Called when a transfer has finished.
1118 (idx, finished_fn) = private
1121 self._feedback_fn("Disk %s finished sending data" % idx)
1123 self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" %
1124 (idx, ie.final_message, ie.recent_output))
1126 self._dresults[idx] = bool(ie.success)
1132 class ExportInstanceHelper:
1133 def __init__(self, lu, feedback_fn, instance):
1134 """Initializes this class.
1136 @param lu: Logical unit instance
1137 @param feedback_fn: Feedback function
1138 @type instance: L{objects.Instance}
1139 @param instance: Instance object
1143 self._feedback_fn = feedback_fn
1144 self._instance = instance
1146 self._snap_disks = []
1147 self._removed_snaps = [False] * len(instance.disks)
1149 def CreateSnapshots(self):
1150 """Creates an LVM snapshot for every disk of the instance.
1153 assert not self._snap_disks
1155 instance = self._instance
1156 src_node = instance.primary_node
1158 for idx, disk in enumerate(instance.disks):
1159 self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1162 # result.payload will be a snapshot of an lvm leaf of the one we
1164 result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1166 msg = result.fail_msg
1168 self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1170 elif (not isinstance(result.payload, (tuple, list)) or
1171 len(result.payload) != 2):
1172 self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1173 " result '%s'", idx, src_node, result.payload)
1175 disk_id = tuple(result.payload)
1176 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1177 logical_id=disk_id, physical_id=disk_id,
1178 iv_name=disk.iv_name)
1180 self._snap_disks.append(new_dev)
1182 assert len(self._snap_disks) == len(instance.disks)
1183 assert len(self._removed_snaps) == len(instance.disks)
1185 def _RemoveSnapshot(self, disk_index):
1186 """Removes an LVM snapshot.
1188 @type disk_index: number
1189 @param disk_index: Index of the snapshot to be removed
1192 disk = self._snap_disks[disk_index]
1193 if disk and not self._removed_snaps[disk_index]:
1194 src_node = self._instance.primary_node
1196 self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1197 (disk_index, src_node))
1199 result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1201 self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1202 " %s: %s", disk_index, src_node, result.fail_msg)
1204 self._removed_snaps[disk_index] = True
1206 def LocalExport(self, dest_node):
1207 """Intra-cluster instance export.
1209 @type dest_node: L{objects.Node}
1210 @param dest_node: Destination node
1213 instance = self._instance
1214 src_node = instance.primary_node
1216 assert len(self._snap_disks) == len(instance.disks)
1220 for idx, dev in enumerate(self._snap_disks):
1222 transfers.append(None)
1225 path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1228 finished_fn = compat.partial(self._TransferFinished, idx)
1230 # FIXME: pass debug option from opcode to backend
1231 dt = DiskTransfer("snapshot/%s" % idx,
1232 constants.IEIO_SCRIPT, (dev, idx),
1233 constants.IEIO_FILE, (path, ),
1235 transfers.append(dt)
1237 # Actually export data
1238 dresults = TransferInstanceData(self._lu, self._feedback_fn,
1239 src_node, dest_node.name,
1240 dest_node.secondary_ip,
1241 instance, transfers)
1243 assert len(dresults) == len(instance.disks)
1245 self._feedback_fn("Finalizing export on %s" % dest_node.name)
1246 result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1248 msg = result.fail_msg
1251 self._lu.LogWarning("Could not finalize export for instance %s"
1252 " on node %s: %s", instance.name, dest_node.name, msg)
1254 return (fin_resu, dresults)
1256 def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1257 """Inter-cluster instance export.
1259 @type disk_info: list
1260 @param disk_info: Per-disk destination information
1261 @type key_name: string
1262 @param key_name: Name of X509 key to use
1263 @type dest_ca_pem: string
1264 @param dest_ca_pem: Destination X509 CA in PEM format
1265 @type timeouts: L{ImportExportTimeouts}
1266 @param timeouts: Timeouts for this import
1269 instance = self._instance
1271 assert len(disk_info) == len(instance.disks)
1273 cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1275 ieloop = ImportExportLoop(self._lu)
1277 for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1279 # Decide whether to use IPv6
1280 ipv6 = netutils.IP6Address.IsValid(host)
1282 opts = objects.ImportExportOptions(key_name=key_name,
1284 magic=magic, ipv6=ipv6)
1286 self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1287 finished_fn = compat.partial(self._TransferFinished, idx)
1288 ieloop.Add(DiskExport(self._lu, instance.primary_node,
1289 opts, host, port, instance,
1290 constants.IEIO_SCRIPT, (dev, idx),
1291 timeouts, cbs, private=(idx, finished_fn)))
1295 ieloop.FinalizeAll()
1297 return (True, cbs.disk_results)
1299 def _TransferFinished(self, idx):
1300 """Called once a transfer has finished.
1303 @param idx: Disk index
1306 logging.debug("Transfer %s finished", idx)
1307 self._RemoveSnapshot(idx)
1310 """Remove all snapshots.
1313 assert len(self._removed_snaps) == len(self._instance.disks)
1314 for idx in range(len(self._instance.disks)):
1315 self._RemoveSnapshot(idx)
1318 class _RemoteImportCb(ImportExportCbBase):
1319 def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1321 """Initializes this class.
1324 @param cds: Cluster domain secret
1325 @type x509_cert_pem: string
1326 @param x509_cert_pem: CA used for signing import key
1327 @type disk_count: number
1328 @param disk_count: Number of disks
1329 @type external_address: string
1330 @param external_address: External address of destination node
1333 ImportExportCbBase.__init__(self)
1334 self._feedback_fn = feedback_fn
1336 self._x509_cert_pem = x509_cert_pem
1337 self._disk_count = disk_count
1338 self._external_address = external_address
1340 self._dresults = [None] * disk_count
1341 self._daemon_port = [None] * disk_count
1343 self._salt = utils.GenerateSecret(8)
1346 def disk_results(self):
1347 """Returns per-disk results.
1350 return self._dresults
1352 def _CheckAllListening(self):
1353 """Checks whether all daemons are listening.
1355 If all daemons are listening, the information is sent to the client.
1358 if not compat.all(dp is not None for dp in self._daemon_port):
1361 host = self._external_address
1364 for idx, (port, magic) in enumerate(self._daemon_port):
1365 disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1366 idx, host, port, magic))
1368 assert len(disks) == self._disk_count
1370 self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1372 "x509_ca": self._x509_cert_pem,
1375 def ReportListening(self, ie, private):
1376 """Called when daemon started listening.
1381 self._feedback_fn("Disk %s is now listening" % idx)
1383 assert self._daemon_port[idx] is None
1385 self._daemon_port[idx] = (ie.listen_port, ie.magic)
1387 self._CheckAllListening()
1389 def ReportConnected(self, ie, private):
1390 """Called when a connection has been established.
1395 self._feedback_fn("Disk %s is now receiving data" % idx)
1397 def ReportFinished(self, ie, private):
1398 """Called when a transfer has finished.
1403 # Daemon is certainly no longer listening
1404 self._daemon_port[idx] = None
1407 self._feedback_fn("Disk %s finished receiving data" % idx)
1409 self._feedback_fn(("Disk %s failed to receive data: %s"
1410 " (recent output: %r)") %
1411 (idx, ie.final_message, ie.recent_output))
1413 self._dresults[idx] = bool(ie.success)
1416 def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1418 """Imports an instance from another cluster.
1420 @param lu: Logical unit instance
1421 @param feedback_fn: Feedback function
1422 @type instance: L{objects.Instance}
1423 @param instance: Instance object
1424 @type pnode: L{objects.Node}
1425 @param pnode: Primary node of instance as an object
1426 @type source_x509_ca: OpenSSL.crypto.X509
1427 @param source_x509_ca: Import source's X509 CA
1429 @param cds: Cluster domain secret
1430 @type timeouts: L{ImportExportTimeouts}
1431 @param timeouts: Timeouts for this import
1434 source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1437 magic_base = utils.GenerateSecret(6)
1439 # Decide whether to use IPv6
1440 ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1443 result = lu.rpc.call_x509_cert_create(instance.primary_node,
1444 constants.RIE_CERT_VALIDITY)
1445 result.Raise("Can't create X509 key and certificate on %s" % result.node)
1447 (x509_key_name, x509_cert_pem) = result.payload
1450 x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1454 signed_x509_cert_pem = \
1455 utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1457 cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1458 len(instance.disks), pnode.primary_ip)
1460 ieloop = ImportExportLoop(lu)
1462 for idx, dev in enumerate(instance.disks):
1463 magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1465 # Import daemon options
1466 opts = objects.ImportExportOptions(key_name=x509_key_name,
1467 ca_pem=source_ca_pem,
1468 magic=magic, ipv6=ipv6)
1470 ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1471 constants.IEIO_SCRIPT, (dev, idx),
1472 timeouts, cbs, private=(idx, )))
1476 ieloop.FinalizeAll()
1478 # Remove crypto key and certificate
1479 result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1480 result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1482 return cbs.disk_results
1485 def _GetImportExportHandshakeMessage(version):
1486 """Returns the handshake message for a RIE protocol version.
1488 @type version: number
1491 return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1494 def ComputeRemoteExportHandshake(cds):
1495 """Computes the remote import/export handshake.
1498 @param cds: Cluster domain secret
1501 salt = utils.GenerateSecret(8)
1502 msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1503 return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1506 def CheckRemoteExportHandshake(cds, handshake):
1507 """Checks the handshake of a remote import/export.
1510 @param cds: Cluster domain secret
1511 @type handshake: sequence
1512 @param handshake: Handshake sent by remote peer
1516 (version, hmac_digest, hmac_salt) = handshake
1517 except (TypeError, ValueError), err:
1518 return "Invalid data: %s" % err
1520 if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1521 hmac_digest, salt=hmac_salt):
1522 return "Hash didn't match, clusters don't share the same domain secret"
1524 if version != constants.RIE_VERSION:
1525 return ("Clusters don't have the same remote import/export protocol"
1526 " (local=%s, remote=%s)" %
1527 (constants.RIE_VERSION, version))
1532 def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1533 """Returns the hashed text for import/export disk information.
1535 @type disk_index: number
1536 @param disk_index: Index of disk (included in hash)
1538 @param host: Hostname
1540 @param port: Daemon port
1542 @param magic: Magic value
1545 return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1548 def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1549 """Verifies received disk information for an export.
1552 @param cds: Cluster domain secret
1553 @type disk_index: number
1554 @param disk_index: Index of disk (included in hash)
1555 @type disk_info: sequence
1556 @param disk_info: Disk information sent by remote peer
1560 (host, port, magic, hmac_digest, hmac_salt) = disk_info
1561 except (TypeError, ValueError), err:
1562 raise errors.GenericError("Invalid data: %s" % err)
1564 if not (host and port and magic):
1565 raise errors.GenericError("Missing destination host, port or magic")
1567 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1569 if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1570 raise errors.GenericError("HMAC is wrong")
1572 if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1575 destination = netutils.Hostname.GetNormalizedName(host)
1577 return (destination,
1578 utils.ValidateServiceName(port),
1582 def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1583 """Computes the signed disk information for a remote import.
1586 @param cds: Cluster domain secret
1588 @param salt: HMAC salt
1589 @type disk_index: number
1590 @param disk_index: Index of disk (included in hash)
1592 @param host: Hostname
1594 @param port: Daemon port
1596 @param magic: Magic value
1599 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1600 hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1601 return (host, port, magic, hmac_digest, salt)