4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Instance-related functions and classes for masterd.
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import compat
33 from ganeti import utils
34 from ganeti import objects
37 class _ImportExportError(Exception):
38 """Local exception to report import/export errors.
43 class ImportExportTimeouts(object):
44 #: Time until daemon starts writing status file
45 DEFAULT_READY_TIMEOUT = 10
47 #: Length of time until errors cause hard failure
48 DEFAULT_ERROR_TIMEOUT = 10
50 #: Time after which daemon must be listening
51 DEFAULT_LISTEN_TIMEOUT = 10
53 #: Progress update interval
54 DEFAULT_PROGRESS_INTERVAL = 60
64 def __init__(self, connect,
65 listen=DEFAULT_LISTEN_TIMEOUT,
66 error=DEFAULT_ERROR_TIMEOUT,
67 ready=DEFAULT_READY_TIMEOUT,
68 progress=DEFAULT_PROGRESS_INTERVAL):
69 """Initializes this class.
72 @param connect: Timeout for establishing connection
74 @param listen: Timeout for starting to listen for connections
76 @param error: Length of time until errors cause hard failure
78 @param ready: Timeout for daemon to become ready
79 @type progress: number
80 @param progress: Progress update interval
86 self.connect = connect
87 self.progress = progress
90 class ImportExportCbBase(object):
91 """Callbacks for disk import/export.
94 def ReportListening(self, ie, private):
95 """Called when daemon started listening.
97 @type ie: Subclass of L{_DiskImportExportBase}
98 @param ie: Import/export object
99 @param private: Private data passed to import/export object
103 def ReportConnected(self, ie, private):
104 """Called when a connection has been established.
106 @type ie: Subclass of L{_DiskImportExportBase}
107 @param ie: Import/export object
108 @param private: Private data passed to import/export object
112 def ReportProgress(self, ie, private):
113 """Called when new progress information should be reported.
115 @type ie: Subclass of L{_DiskImportExportBase}
116 @param ie: Import/export object
117 @param private: Private data passed to import/export object
121 def ReportFinished(self, ie, private):
122 """Called when a transfer has finished.
124 @type ie: Subclass of L{_DiskImportExportBase}
125 @param ie: Import/export object
126 @param private: Private data passed to import/export object
131 def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
132 """Checks whether a timeout has expired.
135 return _time_fn() > (epoch + timeout)
138 class _DiskImportExportBase(object):
141 def __init__(self, lu, node_name, opts,
142 instance, timeouts, cbs, private=None):
143 """Initializes this class.
145 @param lu: Logical unit instance
146 @type node_name: string
147 @param node_name: Node name for import
148 @type opts: L{objects.ImportExportOptions}
149 @param opts: Import/export daemon options
150 @type instance: L{objects.Instance}
151 @param instance: Instance object
152 @type timeouts: L{ImportExportTimeouts}
153 @param timeouts: Timeouts for this import
154 @type cbs: L{ImportExportCbBase}
155 @param cbs: Callbacks
156 @param private: Private data for callback functions
159 assert self.MODE_TEXT
162 self.node_name = node_name
164 self._instance = instance
165 self._timeouts = timeouts
167 self._private = private
173 self._ts_begin = None
174 self._ts_connected = None
175 self._ts_finished = None
176 self._ts_cleanup = None
177 self._ts_last_progress = None
178 self._ts_last_error = None
182 self.final_message = None
185 self._daemon_name = None
189 def recent_output(self):
190 """Returns the most recent output from the daemon.
194 return self._daemon.recent_output
200 """Returns transfer progress information.
206 return (self._daemon.progress_mbytes,
207 self._daemon.progress_throughput,
208 self._daemon.progress_percent,
209 self._daemon.progress_eta)
213 """Determines whether this transport is still active.
216 return self.success is None
220 """Returns parent loop.
222 @rtype: L{ImportExportLoop}
227 def SetLoop(self, loop):
228 """Sets the parent loop.
230 @type loop: L{ImportExportLoop}
234 raise errors.ProgrammerError("Loop can only be set once")
238 def _StartDaemon(self):
239 """Starts the import/export daemon.
242 raise NotImplementedError()
244 def CheckDaemon(self):
245 """Checks whether daemon has been started and if not, starts it.
251 assert self._ts_cleanup is None
253 if self._daemon_name is None:
254 assert self._ts_begin is None
256 result = self._StartDaemon()
258 raise _ImportExportError("Failed to start %s on %s: %s" %
259 (self.MODE_TEXT, self.node_name,
262 daemon_name = result.payload
264 logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
267 self._ts_begin = time.time()
268 self._daemon_name = daemon_name
270 return self._daemon_name
272 def GetDaemonName(self):
273 """Returns the daemon name.
276 assert self._daemon_name, "Daemon has not been started"
277 assert self._ts_cleanup is None
278 return self._daemon_name
281 """Sends SIGTERM to import/export daemon (if still active).
284 if self._daemon_name:
285 self._lu.LogWarning("Aborting %s %r on %s",
286 self.MODE_TEXT, self._daemon_name, self.node_name)
287 result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
289 self._lu.LogWarning("Failed to abort %s %r on %s: %s",
290 self.MODE_TEXT, self._daemon_name,
291 self.node_name, result.fail_msg)
296 def _SetDaemonData(self, data):
297 """Internal function for updating status daemon data.
299 @type data: L{objects.ImportExportStatus}
300 @param data: Daemon status data
303 assert self._ts_begin is not None
306 if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
307 raise _ImportExportError("Didn't become ready after %s seconds" %
308 self._timeouts.ready)
316 def SetDaemonData(self, success, data):
317 """Updates daemon status data.
320 @param success: Whether fetching data was successful or not
321 @type data: L{objects.ImportExportStatus}
322 @param data: Daemon status data
326 if self._ts_last_error is None:
327 self._ts_last_error = time.time()
329 elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
330 raise _ImportExportError("Too many errors while updating data")
334 self._ts_last_error = None
336 return self._SetDaemonData(data)
338 def CheckListening(self):
339 """Checks whether the daemon is listening.
342 raise NotImplementedError()
344 def _GetConnectedCheckEpoch(self):
345 """Returns timeout to calculate connect timeout.
348 raise NotImplementedError()
350 def CheckConnected(self):
351 """Checks whether the daemon is connected.
354 @return: Whether the daemon is connected
357 assert self._daemon, "Daemon status missing"
359 if self._ts_connected is not None:
362 if self._daemon.connected:
363 self._ts_connected = time.time()
365 # TODO: Log remote peer
366 logging.debug("%s %r on %s is now connected",
367 self.MODE_TEXT, self._daemon_name, self.node_name)
369 self._cbs.ReportConnected(self, self._private)
373 if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
374 raise _ImportExportError("Not connected after %s seconds" %
375 self._timeouts.connect)
379 def _CheckProgress(self):
380 """Checks whether a progress update should be reported.
383 if ((self._ts_last_progress is None or
384 _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
386 self._daemon.progress_mbytes is not None and
387 self._daemon.progress_throughput is not None):
388 self._cbs.ReportProgress(self, self._private)
389 self._ts_last_progress = time.time()
391 def CheckFinished(self):
392 """Checks whether the daemon exited.
395 @return: Whether the transfer is finished
398 assert self._daemon, "Daemon status missing"
400 if self._ts_finished:
403 if self._daemon.exit_status is None:
404 # TODO: Adjust delay for ETA expiring soon
405 self._CheckProgress()
408 self._ts_finished = time.time()
410 self._ReportFinished(self._daemon.exit_status == 0,
411 self._daemon.error_message)
415 def _ReportFinished(self, success, message):
416 """Transfer is finished or daemon exited.
419 @param success: Whether the transfer was successful
420 @type message: string
421 @param message: Error message
424 assert self.success is None
426 self.success = success
427 self.final_message = message
430 logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
432 elif self._daemon_name:
433 self._lu.LogWarning("%s %r on %s failed: %s",
434 self.MODE_TEXT, self._daemon_name, self.node_name,
437 self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
438 self.node_name, message)
440 self._cbs.ReportFinished(self, self._private)
443 """Makes the RPC call to finalize this import/export.
446 return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
448 def Finalize(self, error=None):
449 """Finalizes this import/export.
452 if self._daemon_name:
453 logging.info("Finalizing %s %r on %s",
454 self.MODE_TEXT, self._daemon_name, self.node_name)
456 result = self._Finalize()
458 self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
459 self.MODE_TEXT, self._daemon_name,
460 self.node_name, result.fail_msg)
463 # Daemon is no longer running
464 self._daemon_name = None
465 self._ts_cleanup = time.time()
468 self._ReportFinished(False, error)
473 class DiskImport(_DiskImportExportBase):
476 def __init__(self, lu, node_name, opts, instance,
477 dest, dest_args, timeouts, cbs, private=None):
478 """Initializes this class.
480 @param lu: Logical unit instance
481 @type node_name: string
482 @param node_name: Node name for import
483 @type opts: L{objects.ImportExportOptions}
484 @param opts: Import/export daemon options
485 @type instance: L{objects.Instance}
486 @param instance: Instance object
487 @param dest: I/O destination
488 @param dest_args: I/O arguments
489 @type timeouts: L{ImportExportTimeouts}
490 @param timeouts: Timeouts for this import
491 @type cbs: L{ImportExportCbBase}
492 @param cbs: Callbacks
493 @param private: Private data for callback functions
496 _DiskImportExportBase.__init__(self, lu, node_name, opts,
497 instance, timeouts, cbs, private)
499 self._dest_args = dest_args
502 self._ts_listening = None
505 def listen_port(self):
506 """Returns the port the daemon is listening on.
510 return self._daemon.listen_port
514 def _StartDaemon(self):
515 """Starts the import daemon.
518 return self._lu.rpc.call_import_start(self.node_name, self._opts,
520 self._dest, self._dest_args)
522 def CheckListening(self):
523 """Checks whether the daemon is listening.
526 @return: Whether the daemon is listening
529 assert self._daemon, "Daemon status missing"
531 if self._ts_listening is not None:
534 port = self._daemon.listen_port
536 self._ts_listening = time.time()
538 logging.debug("Import %r on %s is now listening on port %s",
539 self._daemon_name, self.node_name, port)
541 self._cbs.ReportListening(self, self._private)
545 if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
546 raise _ImportExportError("Not listening after %s seconds" %
547 self._timeouts.listen)
551 def _GetConnectedCheckEpoch(self):
552 """Returns the time since we started listening.
555 assert self._ts_listening is not None, \
556 ("Checking whether an import is connected is only useful"
557 " once it's been listening")
559 return self._ts_listening
562 class DiskExport(_DiskImportExportBase):
565 def __init__(self, lu, node_name, opts,
566 dest_host, dest_port, instance, source, source_args,
567 timeouts, cbs, private=None):
568 """Initializes this class.
570 @param lu: Logical unit instance
571 @type node_name: string
572 @param node_name: Node name for import
573 @type opts: L{objects.ImportExportOptions}
574 @param opts: Import/export daemon options
575 @type dest_host: string
576 @param dest_host: Destination host name or IP address
577 @type dest_port: number
578 @param dest_port: Destination port number
579 @type instance: L{objects.Instance}
580 @param instance: Instance object
581 @param source: I/O source
582 @param source_args: I/O source
583 @type timeouts: L{ImportExportTimeouts}
584 @param timeouts: Timeouts for this import
585 @type cbs: L{ImportExportCbBase}
586 @param cbs: Callbacks
587 @param private: Private data for callback functions
590 _DiskImportExportBase.__init__(self, lu, node_name, opts,
591 instance, timeouts, cbs, private)
592 self._dest_host = dest_host
593 self._dest_port = dest_port
594 self._source = source
595 self._source_args = source_args
597 def _StartDaemon(self):
598 """Starts the export daemon.
601 return self._lu.rpc.call_export_start(self.node_name, self._opts,
602 self._dest_host, self._dest_port,
603 self._instance, self._source,
606 def CheckListening(self):
607 """Checks whether the daemon is listening.
610 # Only an import can be listening
613 def _GetConnectedCheckEpoch(self):
614 """Returns the time since the daemon started.
617 assert self._ts_begin is not None
619 return self._ts_begin
622 def FormatProgress(progress):
623 """Formats progress information for user consumption
626 (mbytes, throughput, percent, _) = progress
629 utils.FormatUnit(mbytes, "h"),
631 # Not using FormatUnit as it doesn't support kilobytes
632 "%0.1f MiB/s" % throughput,
635 if percent is not None:
636 parts.append("%d%%" % percent)
640 return utils.CommaJoin(parts)
643 class ImportExportLoop:
647 def __init__(self, lu):
648 """Initializes this class.
653 self._pending_add = []
655 def Add(self, diskie):
656 """Adds an import/export object to the loop.
658 @type diskie: Subclass of L{_DiskImportExportBase}
659 @param diskie: Import/export object
662 assert diskie not in self._pending_add
663 assert diskie.loop is None
667 # Adding new objects to a staging list is necessary, otherwise the main
668 # loop gets confused if callbacks modify the queue while the main loop is
670 self._pending_add.append(diskie)
673 def _CollectDaemonStatus(lu, daemons):
674 """Collects the status for all import/export daemons.
679 for node_name, names in daemons.iteritems():
680 result = lu.rpc.call_impexp_status(node_name, names)
682 lu.LogWarning("Failed to get daemon status on %s: %s",
683 node_name, result.fail_msg)
686 assert len(names) == len(result.payload)
688 daemon_status[node_name] = dict(zip(names, result.payload))
693 def _GetActiveDaemonNames(queue):
694 """Gets the names of all active daemons.
699 if not diskie.active:
703 # Start daemon if necessary
704 daemon_name = diskie.CheckDaemon()
705 except _ImportExportError, err:
706 logging.exception("%s failed", diskie.MODE_TEXT)
707 diskie.Finalize(error=str(err))
710 result.setdefault(diskie.node_name, []).append(daemon_name)
712 assert len(queue) >= len(result)
713 assert len(queue) >= sum([len(names) for names in result.itervalues()])
715 logging.debug("daemons=%r", result)
719 def _AddPendingToQueue(self):
720 """Adds all pending import/export objects to the internal queue.
723 assert compat.all(diskie not in self._queue and diskie.loop == self
724 for diskie in self._pending_add)
726 self._queue.extend(self._pending_add)
728 del self._pending_add[:]
731 """Utility main loop.
735 self._AddPendingToQueue()
737 # Collect all active daemon names
738 daemons = self._GetActiveDaemonNames(self._queue)
742 # Collection daemon status data
743 data = self._CollectDaemonStatus(self._lu, daemons)
746 delay = self.MAX_DELAY
747 for diskie in self._queue:
748 if not diskie.active:
753 all_daemon_data = data[diskie.node_name]
755 result = diskie.SetDaemonData(False, None)
758 diskie.SetDaemonData(True,
759 all_daemon_data[diskie.GetDaemonName()])
762 # Daemon not yet ready, retry soon
763 delay = min(3.0, delay)
766 if diskie.CheckFinished():
771 # Normal case: check again in 5 seconds
772 delay = min(5.0, delay)
774 if not diskie.CheckListening():
775 # Not yet listening, retry soon
776 delay = min(1.0, delay)
779 if not diskie.CheckConnected():
780 # Not yet connected, retry soon
781 delay = min(1.0, delay)
784 except _ImportExportError, err:
785 logging.exception("%s failed", diskie.MODE_TEXT)
786 diskie.Finalize(error=str(err))
788 if not compat.any([diskie.active for diskie in self._queue]):
792 delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
793 logging.debug("Waiting for %ss", delay)
796 def FinalizeAll(self):
797 """Finalizes all pending transfers.
802 for diskie in self._queue:
803 success = diskie.Finalize() and success
808 class _TransferInstCbBase(ImportExportCbBase):
809 def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
810 dest_node, dest_ip, export_opts):
811 """Initializes this class.
814 ImportExportCbBase.__init__(self)
817 self.feedback_fn = feedback_fn
818 self.instance = instance
819 self.timeouts = timeouts
820 self.src_node = src_node
821 self.src_cbs = src_cbs
822 self.dest_node = dest_node
823 self.dest_ip = dest_ip
824 self.export_opts = export_opts
827 class _TransferInstSourceCb(_TransferInstCbBase):
828 def ReportConnected(self, ie, dtp):
829 """Called when a connection has been established.
832 assert self.src_cbs is None
833 assert dtp.src_export == ie
834 assert dtp.dest_import
836 self.feedback_fn("%s is sending data on %s" %
837 (dtp.data.name, ie.node_name))
839 def ReportProgress(self, ie, dtp):
840 """Called when new progress information should be reported.
843 progress = ie.progress
847 self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
849 def ReportFinished(self, ie, dtp):
850 """Called when a transfer has finished.
853 assert self.src_cbs is None
854 assert dtp.src_export == ie
855 assert dtp.dest_import
858 self.feedback_fn("%s finished sending data" % dtp.data.name)
860 self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
861 (dtp.data.name, ie.final_message, ie.recent_output))
863 dtp.RecordResult(ie.success)
865 cb = dtp.data.finished_fn
869 # TODO: Check whether sending SIGTERM right away is okay, maybe we should
870 # give the daemon a moment to sort things out
871 if dtp.dest_import and not ie.success:
872 dtp.dest_import.Abort()
875 class _TransferInstDestCb(_TransferInstCbBase):
876 def ReportListening(self, ie, dtp):
877 """Called when daemon started listening.
881 assert dtp.src_export is None
882 assert dtp.dest_import
884 self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
886 # Start export on source node
887 de = DiskExport(self.lu, self.src_node, self.export_opts,
888 self.dest_ip, ie.listen_port,
889 self.instance, dtp.data.src_io, dtp.data.src_ioargs,
890 self.timeouts, self.src_cbs, private=dtp)
895 def ReportConnected(self, ie, dtp):
896 """Called when a connection has been established.
899 self.feedback_fn("%s is receiving data on %s" %
900 (dtp.data.name, self.dest_node))
902 def ReportFinished(self, ie, dtp):
903 """Called when a transfer has finished.
907 self.feedback_fn("%s finished receiving data" % dtp.data.name)
909 self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
910 (dtp.data.name, ie.final_message, ie.recent_output))
912 dtp.RecordResult(ie.success)
914 # TODO: Check whether sending SIGTERM right away is okay, maybe we should
915 # give the daemon a moment to sort things out
916 if dtp.src_export and not ie.success:
917 dtp.src_export.Abort()
920 class DiskTransfer(object):
921 def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
923 """Initializes this class.
926 @param name: User-visible name for this transfer (e.g. "disk/0")
927 @param src_io: Source I/O type
928 @param src_ioargs: Source I/O arguments
929 @param dest_io: Destination I/O type
930 @param dest_ioargs: Destination I/O arguments
931 @type finished_fn: callable
932 @param finished_fn: Function called once transfer has finished
938 self.src_ioargs = src_ioargs
940 self.dest_io = dest_io
941 self.dest_ioargs = dest_ioargs
943 self.finished_fn = finished_fn
946 class _DiskTransferPrivate(object):
947 def __init__(self, data, success):
948 """Initializes this class.
950 @type data: L{DiskTransfer}
956 self.src_export = None
957 self.dest_import = None
959 self.success = success
961 def RecordResult(self, success):
962 """Updates the status.
964 One failed part will cause the whole transfer to fail.
967 self.success = self.success and success
970 def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
971 instance, all_transfers):
972 """Transfers an instance's data from one node to another.
974 @param lu: Logical unit instance
975 @param feedback_fn: Feedback function
976 @type src_node: string
977 @param src_node: Source node name
978 @type dest_node: string
979 @param dest_node: Destination node name
980 @type dest_ip: string
981 @param dest_ip: IP address of destination node
982 @type instance: L{objects.Instance}
983 @param instance: Instance object
984 @type all_transfers: list of L{DiskTransfer} instances
985 @param all_transfers: List of all disk transfers to be made
987 @return: List with a boolean (True=successful, False=failed) for success for
991 # Compress only if transfer is to another node
992 if src_node == dest_node:
993 compress = constants.IEC_NONE
995 compress = constants.IEC_GZIP
997 logging.debug("Source node %s, destination node %s, compression '%s'",
998 src_node, dest_node, compress)
1000 opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1003 timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1004 src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1005 src_node, None, dest_node, dest_ip, opts)
1006 dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1007 src_node, src_cbs, dest_node, dest_ip, opts)
1011 ieloop = ImportExportLoop(lu)
1013 for transfer in all_transfers:
1015 feedback_fn("Exporting %s from %s to %s" %
1016 (transfer.name, src_node, dest_node))
1018 dtp = _DiskTransferPrivate(transfer, True)
1020 di = DiskImport(lu, dest_node, opts, instance,
1021 transfer.dest_io, transfer.dest_ioargs,
1022 timeouts, dest_cbs, private=dtp)
1025 dtp.dest_import = di
1027 dtp = _DiskTransferPrivate(None, False)
1033 ieloop.FinalizeAll()
1035 assert len(all_dtp) == len(all_transfers)
1036 assert compat.all([(dtp.src_export is None or
1037 dtp.src_export.success is not None) and
1038 (dtp.dest_import is None or
1039 dtp.dest_import.success is not None)
1040 for dtp in all_dtp]), \
1041 "Not all imports/exports are finalized"
1043 return [bool(dtp.success) for dtp in all_dtp]
1046 class _RemoteExportCb(ImportExportCbBase):
1047 def __init__(self, feedback_fn, disk_count):
1048 """Initializes this class.
1051 ImportExportCbBase.__init__(self)
1052 self._feedback_fn = feedback_fn
1053 self._dresults = [None] * disk_count
1056 def disk_results(self):
1057 """Returns per-disk results.
1060 return self._dresults
1062 def ReportConnected(self, ie, private):
1063 """Called when a connection has been established.
1068 self._feedback_fn("Disk %s is now sending data" % idx)
1070 def ReportProgress(self, ie, private):
1071 """Called when new progress information should be reported.
1076 progress = ie.progress
1080 self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1082 def ReportFinished(self, ie, private):
1083 """Called when a transfer has finished.
1086 (idx, finished_fn) = private
1089 self._feedback_fn("Disk %s finished sending data" % idx)
1091 self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" %
1092 (idx, ie.final_message, ie.recent_output))
1094 self._dresults[idx] = bool(ie.success)
1100 class ExportInstanceHelper:
1101 def __init__(self, lu, feedback_fn, instance):
1102 """Initializes this class.
1104 @param lu: Logical unit instance
1105 @param feedback_fn: Feedback function
1106 @type instance: L{objects.Instance}
1107 @param instance: Instance object
1111 self._feedback_fn = feedback_fn
1112 self._instance = instance
1114 self._snap_disks = []
1115 self._removed_snaps = [False] * len(instance.disks)
1117 def CreateSnapshots(self):
1118 """Creates an LVM snapshot for every disk of the instance.
1121 assert not self._snap_disks
1123 instance = self._instance
1124 src_node = instance.primary_node
1126 vgname = self._lu.cfg.GetVGName()
1128 for idx, disk in enumerate(instance.disks):
1129 self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1132 # result.payload will be a snapshot of an lvm leaf of the one we
1134 result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1135 msg = result.fail_msg
1137 self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1141 disk_id = (vgname, result.payload)
1142 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1143 logical_id=disk_id, physical_id=disk_id,
1144 iv_name=disk.iv_name)
1146 self._snap_disks.append(new_dev)
1148 assert len(self._snap_disks) == len(instance.disks)
1149 assert len(self._removed_snaps) == len(instance.disks)
1151 def _RemoveSnapshot(self, disk_index):
1152 """Removes an LVM snapshot.
1154 @type disk_index: number
1155 @param disk_index: Index of the snapshot to be removed
1158 disk = self._snap_disks[disk_index]
1159 if disk and not self._removed_snaps[disk_index]:
1160 src_node = self._instance.primary_node
1162 self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1163 (disk_index, src_node))
1165 result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1167 self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1168 " %s: %s", disk_index, src_node, result.fail_msg)
1170 self._removed_snaps[disk_index] = True
1172 def LocalExport(self, dest_node):
1173 """Intra-cluster instance export.
1175 @type dest_node: L{objects.Node}
1176 @param dest_node: Destination node
1179 instance = self._instance
1180 src_node = instance.primary_node
1182 assert len(self._snap_disks) == len(instance.disks)
1186 for idx, dev in enumerate(self._snap_disks):
1188 transfers.append(None)
1191 path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1194 finished_fn = compat.partial(self._TransferFinished, idx)
1196 # FIXME: pass debug option from opcode to backend
1197 dt = DiskTransfer("snapshot/%s" % idx,
1198 constants.IEIO_SCRIPT, (dev, idx),
1199 constants.IEIO_FILE, (path, ),
1201 transfers.append(dt)
1203 # Actually export data
1204 dresults = TransferInstanceData(self._lu, self._feedback_fn,
1205 src_node, dest_node.name,
1206 dest_node.secondary_ip,
1207 instance, transfers)
1209 assert len(dresults) == len(instance.disks)
1211 self._feedback_fn("Finalizing export on %s" % dest_node.name)
1212 result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1214 msg = result.fail_msg
1217 self._lu.LogWarning("Could not finalize export for instance %s"
1218 " on node %s: %s", instance.name, dest_node.name, msg)
1220 return (fin_resu, dresults)
1222 def RemoteExport(self, opts, disk_info, timeouts):
1223 """Inter-cluster instance export.
1225 @type opts: L{objects.ImportExportOptions}
1226 @param opts: Import/export daemon options
1227 @type disk_info: list
1228 @param disk_info: Per-disk destination information
1229 @type timeouts: L{ImportExportTimeouts}
1230 @param timeouts: Timeouts for this import
1233 instance = self._instance
1235 assert len(disk_info) == len(instance.disks)
1237 cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1239 ieloop = ImportExportLoop(self._lu)
1241 for idx, (dev, (host, port, _, _)) in enumerate(zip(instance.disks,
1243 self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1244 finished_fn = compat.partial(self._TransferFinished, idx)
1245 ieloop.Add(DiskExport(self._lu, instance.primary_node,
1246 opts, host, port, instance,
1247 constants.IEIO_SCRIPT, (dev, idx),
1248 timeouts, cbs, private=(idx, finished_fn)))
1252 ieloop.FinalizeAll()
1254 return (True, cbs.disk_results)
1256 def _TransferFinished(self, idx):
1257 """Called once a transfer has finished.
1260 @param idx: Disk index
1263 logging.debug("Transfer %s finished", idx)
1264 self._RemoveSnapshot(idx)
1267 """Remove all snapshots.
1270 assert len(self._removed_snaps) == len(self._instance.disks)
1271 for idx in range(len(self._instance.disks)):
1272 self._RemoveSnapshot(idx)
1275 class _RemoteImportCb(ImportExportCbBase):
1276 def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1278 """Initializes this class.
1281 @param cds: Cluster domain secret
1282 @type x509_cert_pem: string
1283 @param x509_cert_pem: CA used for signing import key
1284 @type disk_count: number
1285 @param disk_count: Number of disks
1286 @type external_address: string
1287 @param external_address: External address of destination node
1290 ImportExportCbBase.__init__(self)
1291 self._feedback_fn = feedback_fn
1293 self._x509_cert_pem = x509_cert_pem
1294 self._disk_count = disk_count
1295 self._external_address = external_address
1297 self._dresults = [None] * disk_count
1298 self._daemon_port = [None] * disk_count
1300 self._salt = utils.GenerateSecret(8)
1303 def disk_results(self):
1304 """Returns per-disk results.
1307 return self._dresults
1309 def _CheckAllListening(self):
1310 """Checks whether all daemons are listening.
1312 If all daemons are listening, the information is sent to the client.
1315 if not compat.all(dp is not None for dp in self._daemon_port):
1318 host = self._external_address
1321 for idx, port in enumerate(self._daemon_port):
1322 disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1325 assert len(disks) == self._disk_count
1327 self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1329 "x509_ca": self._x509_cert_pem,
1332 def ReportListening(self, ie, private):
1333 """Called when daemon started listening.
1338 self._feedback_fn("Disk %s is now listening" % idx)
1340 assert self._daemon_port[idx] is None
1342 self._daemon_port[idx] = ie.listen_port
1344 self._CheckAllListening()
1346 def ReportConnected(self, ie, private):
1347 """Called when a connection has been established.
1352 self._feedback_fn("Disk %s is now receiving data" % idx)
1354 def ReportFinished(self, ie, private):
1355 """Called when a transfer has finished.
1360 # Daemon is certainly no longer listening
1361 self._daemon_port[idx] = None
1364 self._feedback_fn("Disk %s finished receiving data" % idx)
1366 self._feedback_fn(("Disk %s failed to receive data: %s"
1367 " (recent output: %r)") %
1368 (idx, ie.final_message, ie.recent_output))
1370 self._dresults[idx] = bool(ie.success)
1373 def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts):
1374 """Imports an instance from another cluster.
1376 @param lu: Logical unit instance
1377 @param feedback_fn: Feedback function
1378 @type instance: L{objects.Instance}
1379 @param instance: Instance object
1380 @type source_x509_ca: OpenSSL.crypto.X509
1381 @param source_x509_ca: Import source's X509 CA
1383 @param cds: Cluster domain secret
1384 @type timeouts: L{ImportExportTimeouts}
1385 @param timeouts: Timeouts for this import
1388 source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1392 result = lu.rpc.call_x509_cert_create(instance.primary_node,
1393 constants.RIE_CERT_VALIDITY)
1394 result.Raise("Can't create X509 key and certificate on %s" % result.node)
1396 (x509_key_name, x509_cert_pem) = result.payload
1399 x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1402 # Import daemon options
1403 opts = objects.ImportExportOptions(key_name=x509_key_name,
1404 ca_pem=source_ca_pem)
1407 signed_x509_cert_pem = \
1408 utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1410 cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1411 len(instance.disks), instance.primary_node)
1413 ieloop = ImportExportLoop(lu)
1415 for idx, dev in enumerate(instance.disks):
1416 ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1417 constants.IEIO_SCRIPT, (dev, idx),
1418 timeouts, cbs, private=(idx, )))
1422 ieloop.FinalizeAll()
1424 # Remove crypto key and certificate
1425 result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1426 result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1428 return cbs.disk_results
1431 def _GetImportExportHandshakeMessage(version):
1432 """Returns the handshake message for a RIE protocol version.
1434 @type version: number
1437 return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1440 def ComputeRemoteExportHandshake(cds):
1441 """Computes the remote import/export handshake.
1444 @param cds: Cluster domain secret
1447 salt = utils.GenerateSecret(8)
1448 msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1449 return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1452 def CheckRemoteExportHandshake(cds, handshake):
1453 """Checks the handshake of a remote import/export.
1456 @param cds: Cluster domain secret
1457 @type handshake: sequence
1458 @param handshake: Handshake sent by remote peer
1462 (version, hmac_digest, hmac_salt) = handshake
1463 except (TypeError, ValueError), err:
1464 return "Invalid data: %s" % err
1466 if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1467 hmac_digest, salt=hmac_salt):
1468 return "Hash didn't match, clusters don't share the same domain secret"
1470 if version != constants.RIE_VERSION:
1471 return ("Clusters don't have the same remote import/export protocol"
1472 " (local=%s, remote=%s)" %
1473 (constants.RIE_VERSION, version))
1478 def _GetRieDiskInfoMessage(disk_index, host, port):
1479 """Returns the hashed text for import/export disk information.
1481 @type disk_index: number
1482 @param disk_index: Index of disk (included in hash)
1484 @param host: Hostname
1486 @param port: Daemon port
1489 return "%s:%s:%s" % (disk_index, host, port)
1492 def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1493 """Verifies received disk information for an export.
1496 @param cds: Cluster domain secret
1497 @type disk_index: number
1498 @param disk_index: Index of disk (included in hash)
1499 @type disk_info: sequence
1500 @param disk_info: Disk information sent by remote peer
1504 (host, port, hmac_digest, hmac_salt) = disk_info
1505 except (TypeError, ValueError), err:
1506 raise errors.GenericError("Invalid data: %s" % err)
1508 if not (host and port):
1509 raise errors.GenericError("Missing destination host or port")
1511 msg = _GetRieDiskInfoMessage(disk_index, host, port)
1513 if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1514 raise errors.GenericError("HMAC is wrong")
1519 def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port):
1520 """Computes the signed disk information for a remote import.
1523 @param cds: Cluster domain secret
1525 @param salt: HMAC salt
1526 @type disk_index: number
1527 @param disk_index: Index of disk (included in hash)
1529 @param host: Hostname
1531 @param port: Daemon port
1534 msg = _GetRieDiskInfoMessage(disk_index, host, port)
1535 hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1536 return (host, port, hmac_digest, salt)