4 # Copyright (C) 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Instance-related functions and classes for masterd.
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import compat
33 from ganeti import utils
34 from ganeti import objects
35 from ganeti import netutils
38 class _ImportExportError(Exception):
39 """Local exception to report import/export errors.
44 class ImportExportTimeouts(object):
45 #: Time until daemon starts writing status file
46 DEFAULT_READY_TIMEOUT = 10
48 #: Length of time until errors cause hard failure
49 DEFAULT_ERROR_TIMEOUT = 10
51 #: Time after which daemon must be listening
52 DEFAULT_LISTEN_TIMEOUT = 10
54 #: Progress update interval
55 DEFAULT_PROGRESS_INTERVAL = 60
65 def __init__(self, connect,
66 listen=DEFAULT_LISTEN_TIMEOUT,
67 error=DEFAULT_ERROR_TIMEOUT,
68 ready=DEFAULT_READY_TIMEOUT,
69 progress=DEFAULT_PROGRESS_INTERVAL):
70 """Initializes this class.
73 @param connect: Timeout for establishing connection
75 @param listen: Timeout for starting to listen for connections
77 @param error: Length of time until errors cause hard failure
79 @param ready: Timeout for daemon to become ready
80 @type progress: number
81 @param progress: Progress update interval
87 self.connect = connect
88 self.progress = progress
91 class ImportExportCbBase(object):
92 """Callbacks for disk import/export.
95 def ReportListening(self, ie, private, component):
96 """Called when daemon started listening.
98 @type ie: Subclass of L{_DiskImportExportBase}
99 @param ie: Import/export object
100 @param private: Private data passed to import/export object
101 @param component: transfer component name
105 def ReportConnected(self, ie, private):
106 """Called when a connection has been established.
108 @type ie: Subclass of L{_DiskImportExportBase}
109 @param ie: Import/export object
110 @param private: Private data passed to import/export object
114 def ReportProgress(self, ie, private):
115 """Called when new progress information should be reported.
117 @type ie: Subclass of L{_DiskImportExportBase}
118 @param ie: Import/export object
119 @param private: Private data passed to import/export object
123 def ReportFinished(self, ie, private):
124 """Called when a transfer has finished.
126 @type ie: Subclass of L{_DiskImportExportBase}
127 @param ie: Import/export object
128 @param private: Private data passed to import/export object
133 class _DiskImportExportBase(object):
136 def __init__(self, lu, node_name, opts,
137 instance, component, timeouts, cbs, private=None):
138 """Initializes this class.
140 @param lu: Logical unit instance
141 @type node_name: string
142 @param node_name: Node name for import
143 @type opts: L{objects.ImportExportOptions}
144 @param opts: Import/export daemon options
145 @type instance: L{objects.Instance}
146 @param instance: Instance object
147 @type component: string
148 @param component: which part of the instance is being imported
149 @type timeouts: L{ImportExportTimeouts}
150 @param timeouts: Timeouts for this import
151 @type cbs: L{ImportExportCbBase}
152 @param cbs: Callbacks
153 @param private: Private data for callback functions
156 assert self.MODE_TEXT
159 self.node_name = node_name
160 self._opts = opts.Copy()
161 self._instance = instance
162 self._component = component
163 self._timeouts = timeouts
165 self._private = private
167 # Set master daemon's timeout in options for import/export daemon
168 assert self._opts.connect_timeout is None
169 self._opts.connect_timeout = timeouts.connect
175 self._ts_begin = None
176 self._ts_connected = None
177 self._ts_finished = None
178 self._ts_cleanup = None
179 self._ts_last_progress = None
180 self._ts_last_error = None
184 self.final_message = None
187 self._daemon_name = None
191 def recent_output(self):
192 """Returns the most recent output from the daemon.
196 return "\n".join(self._daemon.recent_output)
202 """Returns transfer progress information.
208 return (self._daemon.progress_mbytes,
209 self._daemon.progress_throughput,
210 self._daemon.progress_percent,
211 self._daemon.progress_eta)
215 """Returns the magic value for this import/export.
218 return self._opts.magic
222 """Determines whether this transport is still active.
225 return self.success is None
229 """Returns parent loop.
231 @rtype: L{ImportExportLoop}
236 def SetLoop(self, loop):
237 """Sets the parent loop.
239 @type loop: L{ImportExportLoop}
243 raise errors.ProgrammerError("Loop can only be set once")
247 def _StartDaemon(self):
248 """Starts the import/export daemon.
251 raise NotImplementedError()
253 def CheckDaemon(self):
254 """Checks whether daemon has been started and if not, starts it.
260 assert self._ts_cleanup is None
262 if self._daemon_name is None:
263 assert self._ts_begin is None
265 result = self._StartDaemon()
267 raise _ImportExportError("Failed to start %s on %s: %s" %
268 (self.MODE_TEXT, self.node_name,
271 daemon_name = result.payload
273 logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
276 self._ts_begin = time.time()
277 self._daemon_name = daemon_name
279 return self._daemon_name
281 def GetDaemonName(self):
282 """Returns the daemon name.
285 assert self._daemon_name, "Daemon has not been started"
286 assert self._ts_cleanup is None
287 return self._daemon_name
290 """Sends SIGTERM to import/export daemon (if still active).
293 if self._daemon_name:
294 self._lu.LogWarning("Aborting %s '%s' on %s",
295 self.MODE_TEXT, self._daemon_name, self.node_name)
296 result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
298 self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
299 self.MODE_TEXT, self._daemon_name,
300 self.node_name, result.fail_msg)
305 def _SetDaemonData(self, data):
306 """Internal function for updating status daemon data.
308 @type data: L{objects.ImportExportStatus}
309 @param data: Daemon status data
312 assert self._ts_begin is not None
315 if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready):
316 raise _ImportExportError("Didn't become ready after %s seconds" %
317 self._timeouts.ready)
325 def SetDaemonData(self, success, data):
326 """Updates daemon status data.
329 @param success: Whether fetching data was successful or not
330 @type data: L{objects.ImportExportStatus}
331 @param data: Daemon status data
335 if self._ts_last_error is None:
336 self._ts_last_error = time.time()
338 elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error):
339 raise _ImportExportError("Too many errors while updating data")
343 self._ts_last_error = None
345 return self._SetDaemonData(data)
347 def CheckListening(self):
348 """Checks whether the daemon is listening.
351 raise NotImplementedError()
353 def _GetConnectedCheckEpoch(self):
354 """Returns timeout to calculate connect timeout.
357 raise NotImplementedError()
359 def CheckConnected(self):
360 """Checks whether the daemon is connected.
363 @return: Whether the daemon is connected
366 assert self._daemon, "Daemon status missing"
368 if self._ts_connected is not None:
371 if self._daemon.connected:
372 self._ts_connected = time.time()
374 # TODO: Log remote peer
375 logging.debug("%s '%s' on %s is now connected",
376 self.MODE_TEXT, self._daemon_name, self.node_name)
378 self._cbs.ReportConnected(self, self._private)
382 if utils.TimeoutExpired(self._GetConnectedCheckEpoch(),
383 self._timeouts.connect):
384 raise _ImportExportError("Not connected after %s seconds" %
385 self._timeouts.connect)
389 def _CheckProgress(self):
390 """Checks whether a progress update should be reported.
393 if ((self._ts_last_progress is None or
394 utils.TimeoutExpired(self._ts_last_progress,
395 self._timeouts.progress)) and
397 self._daemon.progress_mbytes is not None and
398 self._daemon.progress_throughput is not None):
399 self._cbs.ReportProgress(self, self._private)
400 self._ts_last_progress = time.time()
402 def CheckFinished(self):
403 """Checks whether the daemon exited.
406 @return: Whether the transfer is finished
409 assert self._daemon, "Daemon status missing"
411 if self._ts_finished:
414 if self._daemon.exit_status is None:
415 # TODO: Adjust delay for ETA expiring soon
416 self._CheckProgress()
419 self._ts_finished = time.time()
421 self._ReportFinished(self._daemon.exit_status == 0,
422 self._daemon.error_message)
426 def _ReportFinished(self, success, message):
427 """Transfer is finished or daemon exited.
430 @param success: Whether the transfer was successful
431 @type message: string
432 @param message: Error message
435 assert self.success is None
437 self.success = success
438 self.final_message = message
441 logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
442 self._daemon_name, self.node_name)
443 elif self._daemon_name:
444 self._lu.LogWarning("%s '%s' on %s failed: %s",
445 self.MODE_TEXT, self._daemon_name, self.node_name,
448 self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
449 self.node_name, message)
451 self._cbs.ReportFinished(self, self._private)
454 """Makes the RPC call to finalize this import/export.
457 return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
459 def Finalize(self, error=None):
460 """Finalizes this import/export.
463 if self._daemon_name:
464 logging.info("Finalizing %s '%s' on %s",
465 self.MODE_TEXT, self._daemon_name, self.node_name)
467 result = self._Finalize()
469 self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
470 self.MODE_TEXT, self._daemon_name,
471 self.node_name, result.fail_msg)
474 # Daemon is no longer running
475 self._daemon_name = None
476 self._ts_cleanup = time.time()
479 self._ReportFinished(False, error)
484 class DiskImport(_DiskImportExportBase):
487 def __init__(self, lu, node_name, opts, instance, component,
488 dest, dest_args, timeouts, cbs, private=None):
489 """Initializes this class.
491 @param lu: Logical unit instance
492 @type node_name: string
493 @param node_name: Node name for import
494 @type opts: L{objects.ImportExportOptions}
495 @param opts: Import/export daemon options
496 @type instance: L{objects.Instance}
497 @param instance: Instance object
498 @type component: string
499 @param component: which part of the instance is being imported
500 @param dest: I/O destination
501 @param dest_args: I/O arguments
502 @type timeouts: L{ImportExportTimeouts}
503 @param timeouts: Timeouts for this import
504 @type cbs: L{ImportExportCbBase}
505 @param cbs: Callbacks
506 @param private: Private data for callback functions
509 _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
510 component, timeouts, cbs, private)
512 self._dest_args = dest_args
515 self._ts_listening = None
518 def listen_port(self):
519 """Returns the port the daemon is listening on.
523 return self._daemon.listen_port
527 def _StartDaemon(self):
528 """Starts the import daemon.
531 return self._lu.rpc.call_import_start(self.node_name, self._opts,
532 self._instance, self._component,
533 (self._dest, self._dest_args))
535 def CheckListening(self):
536 """Checks whether the daemon is listening.
539 @return: Whether the daemon is listening
542 assert self._daemon, "Daemon status missing"
544 if self._ts_listening is not None:
547 port = self._daemon.listen_port
549 self._ts_listening = time.time()
551 logging.debug("Import '%s' on %s is now listening on port %s",
552 self._daemon_name, self.node_name, port)
554 self._cbs.ReportListening(self, self._private, self._component)
558 if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen):
559 raise _ImportExportError("Not listening after %s seconds" %
560 self._timeouts.listen)
564 def _GetConnectedCheckEpoch(self):
565 """Returns the time since we started listening.
568 assert self._ts_listening is not None, \
569 ("Checking whether an import is connected is only useful"
570 " once it's been listening")
572 return self._ts_listening
575 class DiskExport(_DiskImportExportBase):
578 def __init__(self, lu, node_name, opts, dest_host, dest_port,
579 instance, component, source, source_args,
580 timeouts, cbs, private=None):
581 """Initializes this class.
583 @param lu: Logical unit instance
584 @type node_name: string
585 @param node_name: Node name for import
586 @type opts: L{objects.ImportExportOptions}
587 @param opts: Import/export daemon options
588 @type dest_host: string
589 @param dest_host: Destination host name or IP address
590 @type dest_port: number
591 @param dest_port: Destination port number
592 @type instance: L{objects.Instance}
593 @param instance: Instance object
594 @type component: string
595 @param component: which part of the instance is being imported
596 @param source: I/O source
597 @param source_args: I/O source
598 @type timeouts: L{ImportExportTimeouts}
599 @param timeouts: Timeouts for this import
600 @type cbs: L{ImportExportCbBase}
601 @param cbs: Callbacks
602 @param private: Private data for callback functions
605 _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
606 component, timeouts, cbs, private)
607 self._dest_host = dest_host
608 self._dest_port = dest_port
609 self._source = source
610 self._source_args = source_args
612 def _StartDaemon(self):
613 """Starts the export daemon.
616 return self._lu.rpc.call_export_start(self.node_name, self._opts,
617 self._dest_host, self._dest_port,
618 self._instance, self._component,
619 (self._source, self._source_args))
621 def CheckListening(self):
622 """Checks whether the daemon is listening.
625 # Only an import can be listening
628 def _GetConnectedCheckEpoch(self):
629 """Returns the time since the daemon started.
632 assert self._ts_begin is not None
634 return self._ts_begin
637 def FormatProgress(progress):
638 """Formats progress information for user consumption
641 (mbytes, throughput, percent, eta) = progress
644 utils.FormatUnit(mbytes, "h"),
646 # Not using FormatUnit as it doesn't support kilobytes
647 "%0.1f MiB/s" % throughput,
650 if percent is not None:
651 parts.append("%d%%" % percent)
654 parts.append("ETA %s" % utils.FormatSeconds(eta))
656 return utils.CommaJoin(parts)
659 class ImportExportLoop:
663 def __init__(self, lu):
664 """Initializes this class.
669 self._pending_add = []
671 def Add(self, diskie):
672 """Adds an import/export object to the loop.
674 @type diskie: Subclass of L{_DiskImportExportBase}
675 @param diskie: Import/export object
678 assert diskie not in self._pending_add
679 assert diskie.loop is None
683 # Adding new objects to a staging list is necessary, otherwise the main
684 # loop gets confused if callbacks modify the queue while the main loop is
686 self._pending_add.append(diskie)
689 def _CollectDaemonStatus(lu, daemons):
690 """Collects the status for all import/export daemons.
695 for node_name, names in daemons.iteritems():
696 result = lu.rpc.call_impexp_status(node_name, names)
698 lu.LogWarning("Failed to get daemon status on %s: %s",
699 node_name, result.fail_msg)
702 assert len(names) == len(result.payload)
704 daemon_status[node_name] = dict(zip(names, result.payload))
709 def _GetActiveDaemonNames(queue):
710 """Gets the names of all active daemons.
715 if not diskie.active:
719 # Start daemon if necessary
720 daemon_name = diskie.CheckDaemon()
721 except _ImportExportError, err:
722 logging.exception("%s failed", diskie.MODE_TEXT)
723 diskie.Finalize(error=str(err))
726 result.setdefault(diskie.node_name, []).append(daemon_name)
728 assert len(queue) >= len(result)
729 assert len(queue) >= sum([len(names) for names in result.itervalues()])
731 logging.debug("daemons=%r", result)
735 def _AddPendingToQueue(self):
736 """Adds all pending import/export objects to the internal queue.
739 assert compat.all(diskie not in self._queue and diskie.loop == self
740 for diskie in self._pending_add)
742 self._queue.extend(self._pending_add)
744 del self._pending_add[:]
747 """Utility main loop.
751 self._AddPendingToQueue()
753 # Collect all active daemon names
754 daemons = self._GetActiveDaemonNames(self._queue)
758 # Collection daemon status data
759 data = self._CollectDaemonStatus(self._lu, daemons)
762 delay = self.MAX_DELAY
763 for diskie in self._queue:
764 if not diskie.active:
769 all_daemon_data = data[diskie.node_name]
771 result = diskie.SetDaemonData(False, None)
774 diskie.SetDaemonData(True,
775 all_daemon_data[diskie.GetDaemonName()])
778 # Daemon not yet ready, retry soon
779 delay = min(3.0, delay)
782 if diskie.CheckFinished():
787 # Normal case: check again in 5 seconds
788 delay = min(5.0, delay)
790 if not diskie.CheckListening():
791 # Not yet listening, retry soon
792 delay = min(1.0, delay)
795 if not diskie.CheckConnected():
796 # Not yet connected, retry soon
797 delay = min(1.0, delay)
800 except _ImportExportError, err:
801 logging.exception("%s failed", diskie.MODE_TEXT)
802 diskie.Finalize(error=str(err))
804 if not compat.any(diskie.active for diskie in self._queue):
808 delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
809 logging.debug("Waiting for %ss", delay)
812 def FinalizeAll(self):
813 """Finalizes all pending transfers.
818 for diskie in self._queue:
819 success = diskie.Finalize() and success
824 class _TransferInstCbBase(ImportExportCbBase):
825 def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
827 """Initializes this class.
830 ImportExportCbBase.__init__(self)
833 self.feedback_fn = feedback_fn
834 self.instance = instance
835 self.timeouts = timeouts
836 self.src_node = src_node
837 self.src_cbs = src_cbs
838 self.dest_node = dest_node
839 self.dest_ip = dest_ip
842 class _TransferInstSourceCb(_TransferInstCbBase):
843 def ReportConnected(self, ie, dtp):
844 """Called when a connection has been established.
847 assert self.src_cbs is None
848 assert dtp.src_export == ie
849 assert dtp.dest_import
851 self.feedback_fn("%s is sending data on %s" %
852 (dtp.data.name, ie.node_name))
854 def ReportProgress(self, ie, dtp):
855 """Called when new progress information should be reported.
858 progress = ie.progress
862 self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
864 def ReportFinished(self, ie, dtp):
865 """Called when a transfer has finished.
868 assert self.src_cbs is None
869 assert dtp.src_export == ie
870 assert dtp.dest_import
873 self.feedback_fn("%s finished sending data" % dtp.data.name)
875 self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
876 (dtp.data.name, ie.final_message, ie.recent_output))
878 dtp.RecordResult(ie.success)
880 cb = dtp.data.finished_fn
884 # TODO: Check whether sending SIGTERM right away is okay, maybe we should
885 # give the daemon a moment to sort things out
886 if dtp.dest_import and not ie.success:
887 dtp.dest_import.Abort()
890 class _TransferInstDestCb(_TransferInstCbBase):
891 def ReportListening(self, ie, dtp, component):
892 """Called when daemon started listening.
896 assert dtp.src_export is None
897 assert dtp.dest_import
898 assert dtp.export_opts
900 self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
902 # Start export on source node
903 de = DiskExport(self.lu, self.src_node, dtp.export_opts,
904 self.dest_ip, ie.listen_port, self.instance,
905 component, dtp.data.src_io, dtp.data.src_ioargs,
906 self.timeouts, self.src_cbs, private=dtp)
911 def ReportConnected(self, ie, dtp):
912 """Called when a connection has been established.
915 self.feedback_fn("%s is receiving data on %s" %
916 (dtp.data.name, self.dest_node))
918 def ReportFinished(self, ie, dtp):
919 """Called when a transfer has finished.
923 self.feedback_fn("%s finished receiving data" % dtp.data.name)
925 self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
926 (dtp.data.name, ie.final_message, ie.recent_output))
928 dtp.RecordResult(ie.success)
930 # TODO: Check whether sending SIGTERM right away is okay, maybe we should
931 # give the daemon a moment to sort things out
932 if dtp.src_export and not ie.success:
933 dtp.src_export.Abort()
936 class DiskTransfer(object):
937 def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
939 """Initializes this class.
942 @param name: User-visible name for this transfer (e.g. "disk/0")
943 @param src_io: Source I/O type
944 @param src_ioargs: Source I/O arguments
945 @param dest_io: Destination I/O type
946 @param dest_ioargs: Destination I/O arguments
947 @type finished_fn: callable
948 @param finished_fn: Function called once transfer has finished
954 self.src_ioargs = src_ioargs
956 self.dest_io = dest_io
957 self.dest_ioargs = dest_ioargs
959 self.finished_fn = finished_fn
962 class _DiskTransferPrivate(object):
963 def __init__(self, data, success, export_opts):
964 """Initializes this class.
966 @type data: L{DiskTransfer}
971 self.success = success
972 self.export_opts = export_opts
974 self.src_export = None
975 self.dest_import = None
977 def RecordResult(self, success):
978 """Updates the status.
980 One failed part will cause the whole transfer to fail.
983 self.success = self.success and success
986 def _GetInstDiskMagic(base, instance_name, index):
987 """Computes the magic value for a disk export or import.
990 @param base: Random seed value (can be the same for all disks of a transfer)
991 @type instance_name: string
992 @param instance_name: Name of instance
994 @param index: Disk index
997 h = compat.sha1_hash()
998 h.update(str(constants.RIE_VERSION))
1000 h.update(instance_name)
1001 h.update(str(index))
1002 return h.hexdigest()
1005 def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
1006 instance, all_transfers):
1007 """Transfers an instance's data from one node to another.
1009 @param lu: Logical unit instance
1010 @param feedback_fn: Feedback function
1011 @type src_node: string
1012 @param src_node: Source node name
1013 @type dest_node: string
1014 @param dest_node: Destination node name
1015 @type dest_ip: string
1016 @param dest_ip: IP address of destination node
1017 @type instance: L{objects.Instance}
1018 @param instance: Instance object
1019 @type all_transfers: list of L{DiskTransfer} instances
1020 @param all_transfers: List of all disk transfers to be made
1022 @return: List with a boolean (True=successful, False=failed) for success for
1026 # Disable compression for all moves as these are all within the same cluster
1027 compress = constants.IEC_NONE
1029 logging.debug("Source node %s, destination node %s, compression '%s'",
1030 src_node, dest_node, compress)
1032 timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1033 src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1034 src_node, None, dest_node, dest_ip)
1035 dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1036 src_node, src_cbs, dest_node, dest_ip)
1040 base_magic = utils.GenerateSecret(6)
1042 ieloop = ImportExportLoop(lu)
1044 for idx, transfer in enumerate(all_transfers):
1046 feedback_fn("Exporting %s from %s to %s" %
1047 (transfer.name, src_node, dest_node))
1049 magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1050 opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1051 compress=compress, magic=magic)
1053 dtp = _DiskTransferPrivate(transfer, True, opts)
1055 di = DiskImport(lu, dest_node, opts, instance, "disk%d" % idx,
1056 transfer.dest_io, transfer.dest_ioargs,
1057 timeouts, dest_cbs, private=dtp)
1060 dtp.dest_import = di
1062 dtp = _DiskTransferPrivate(None, False, None)
1068 ieloop.FinalizeAll()
1070 assert len(all_dtp) == len(all_transfers)
1071 assert compat.all((dtp.src_export is None or
1072 dtp.src_export.success is not None) and
1073 (dtp.dest_import is None or
1074 dtp.dest_import.success is not None)
1075 for dtp in all_dtp), \
1076 "Not all imports/exports are finalized"
1078 return [bool(dtp.success) for dtp in all_dtp]
1081 class _RemoteExportCb(ImportExportCbBase):
1082 def __init__(self, feedback_fn, disk_count):
1083 """Initializes this class.
1086 ImportExportCbBase.__init__(self)
1087 self._feedback_fn = feedback_fn
1088 self._dresults = [None] * disk_count
1091 def disk_results(self):
1092 """Returns per-disk results.
1095 return self._dresults
1097 def ReportConnected(self, ie, private):
1098 """Called when a connection has been established.
1103 self._feedback_fn("Disk %s is now sending data" % idx)
1105 def ReportProgress(self, ie, private):
1106 """Called when new progress information should be reported.
1111 progress = ie.progress
1115 self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1117 def ReportFinished(self, ie, private):
1118 """Called when a transfer has finished.
1121 (idx, finished_fn) = private
1124 self._feedback_fn("Disk %s finished sending data" % idx)
1126 self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1127 (idx, ie.final_message, ie.recent_output))
1129 self._dresults[idx] = bool(ie.success)
1135 class ExportInstanceHelper:
1136 def __init__(self, lu, feedback_fn, instance):
1137 """Initializes this class.
1139 @param lu: Logical unit instance
1140 @param feedback_fn: Feedback function
1141 @type instance: L{objects.Instance}
1142 @param instance: Instance object
1146 self._feedback_fn = feedback_fn
1147 self._instance = instance
1149 self._snap_disks = []
1150 self._removed_snaps = [False] * len(instance.disks)
1152 def CreateSnapshots(self):
1153 """Creates an LVM snapshot for every disk of the instance.
1156 assert not self._snap_disks
1158 instance = self._instance
1159 src_node = instance.primary_node
1161 for idx, disk in enumerate(instance.disks):
1162 self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1165 # result.payload will be a snapshot of an lvm leaf of the one we
1167 result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1169 msg = result.fail_msg
1171 self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1173 elif (not isinstance(result.payload, (tuple, list)) or
1174 len(result.payload) != 2):
1175 self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1176 " result '%s'", idx, src_node, result.payload)
1178 disk_id = tuple(result.payload)
1179 disk_params = constants.DISK_LD_DEFAULTS[constants.LD_LV].copy()
1180 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1181 logical_id=disk_id, physical_id=disk_id,
1182 iv_name=disk.iv_name,
1185 self._snap_disks.append(new_dev)
1187 assert len(self._snap_disks) == len(instance.disks)
1188 assert len(self._removed_snaps) == len(instance.disks)
1190 def _RemoveSnapshot(self, disk_index):
1191 """Removes an LVM snapshot.
1193 @type disk_index: number
1194 @param disk_index: Index of the snapshot to be removed
1197 disk = self._snap_disks[disk_index]
1198 if disk and not self._removed_snaps[disk_index]:
1199 src_node = self._instance.primary_node
1201 self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1202 (disk_index, src_node))
1204 result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1206 self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1207 " %s: %s", disk_index, src_node, result.fail_msg)
1209 self._removed_snaps[disk_index] = True
1211 def LocalExport(self, dest_node):
1212 """Intra-cluster instance export.
1214 @type dest_node: L{objects.Node}
1215 @param dest_node: Destination node
1218 instance = self._instance
1219 src_node = instance.primary_node
1221 assert len(self._snap_disks) == len(instance.disks)
1225 for idx, dev in enumerate(self._snap_disks):
1227 transfers.append(None)
1230 path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1233 finished_fn = compat.partial(self._TransferFinished, idx)
1235 # FIXME: pass debug option from opcode to backend
1236 dt = DiskTransfer("snapshot/%s" % idx,
1237 constants.IEIO_SCRIPT, (dev, idx),
1238 constants.IEIO_FILE, (path, ),
1240 transfers.append(dt)
1242 # Actually export data
1243 dresults = TransferInstanceData(self._lu, self._feedback_fn,
1244 src_node, dest_node.name,
1245 dest_node.secondary_ip,
1246 instance, transfers)
1248 assert len(dresults) == len(instance.disks)
1250 self._feedback_fn("Finalizing export on %s" % dest_node.name)
1251 result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1253 msg = result.fail_msg
1256 self._lu.LogWarning("Could not finalize export for instance %s"
1257 " on node %s: %s", instance.name, dest_node.name, msg)
1259 return (fin_resu, dresults)
1261 def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1262 """Inter-cluster instance export.
1264 @type disk_info: list
1265 @param disk_info: Per-disk destination information
1266 @type key_name: string
1267 @param key_name: Name of X509 key to use
1268 @type dest_ca_pem: string
1269 @param dest_ca_pem: Destination X509 CA in PEM format
1270 @type timeouts: L{ImportExportTimeouts}
1271 @param timeouts: Timeouts for this import
1274 instance = self._instance
1276 assert len(disk_info) == len(instance.disks)
1278 cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1280 ieloop = ImportExportLoop(self._lu)
1282 for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1284 # Decide whether to use IPv6
1285 ipv6 = netutils.IP6Address.IsValid(host)
1287 opts = objects.ImportExportOptions(key_name=key_name,
1289 magic=magic, ipv6=ipv6)
1291 self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1292 finished_fn = compat.partial(self._TransferFinished, idx)
1293 ieloop.Add(DiskExport(self._lu, instance.primary_node,
1294 opts, host, port, instance, "disk%d" % idx,
1295 constants.IEIO_SCRIPT, (dev, idx),
1296 timeouts, cbs, private=(idx, finished_fn)))
1300 ieloop.FinalizeAll()
1302 return (True, cbs.disk_results)
1304 def _TransferFinished(self, idx):
1305 """Called once a transfer has finished.
1308 @param idx: Disk index
1311 logging.debug("Transfer %s finished", idx)
1312 self._RemoveSnapshot(idx)
1315 """Remove all snapshots.
1318 assert len(self._removed_snaps) == len(self._instance.disks)
1319 for idx in range(len(self._instance.disks)):
1320 self._RemoveSnapshot(idx)
1323 class _RemoteImportCb(ImportExportCbBase):
1324 def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1326 """Initializes this class.
1329 @param cds: Cluster domain secret
1330 @type x509_cert_pem: string
1331 @param x509_cert_pem: CA used for signing import key
1332 @type disk_count: number
1333 @param disk_count: Number of disks
1334 @type external_address: string
1335 @param external_address: External address of destination node
1338 ImportExportCbBase.__init__(self)
1339 self._feedback_fn = feedback_fn
1341 self._x509_cert_pem = x509_cert_pem
1342 self._disk_count = disk_count
1343 self._external_address = external_address
1345 self._dresults = [None] * disk_count
1346 self._daemon_port = [None] * disk_count
1348 self._salt = utils.GenerateSecret(8)
1351 def disk_results(self):
1352 """Returns per-disk results.
1355 return self._dresults
1357 def _CheckAllListening(self):
1358 """Checks whether all daemons are listening.
1360 If all daemons are listening, the information is sent to the client.
1363 if not compat.all(dp is not None for dp in self._daemon_port):
1366 host = self._external_address
1369 for idx, (port, magic) in enumerate(self._daemon_port):
1370 disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1371 idx, host, port, magic))
1373 assert len(disks) == self._disk_count
1375 self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1377 "x509_ca": self._x509_cert_pem,
1380 def ReportListening(self, ie, private, _):
1381 """Called when daemon started listening.
1386 self._feedback_fn("Disk %s is now listening" % idx)
1388 assert self._daemon_port[idx] is None
1390 self._daemon_port[idx] = (ie.listen_port, ie.magic)
1392 self._CheckAllListening()
1394 def ReportConnected(self, ie, private):
1395 """Called when a connection has been established.
1400 self._feedback_fn("Disk %s is now receiving data" % idx)
1402 def ReportFinished(self, ie, private):
1403 """Called when a transfer has finished.
1408 # Daemon is certainly no longer listening
1409 self._daemon_port[idx] = None
1412 self._feedback_fn("Disk %s finished receiving data" % idx)
1414 self._feedback_fn(("Disk %s failed to receive data: %s"
1415 " (recent output: %s)") %
1416 (idx, ie.final_message, ie.recent_output))
1418 self._dresults[idx] = bool(ie.success)
1421 def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1423 """Imports an instance from another cluster.
1425 @param lu: Logical unit instance
1426 @param feedback_fn: Feedback function
1427 @type instance: L{objects.Instance}
1428 @param instance: Instance object
1429 @type pnode: L{objects.Node}
1430 @param pnode: Primary node of instance as an object
1431 @type source_x509_ca: OpenSSL.crypto.X509
1432 @param source_x509_ca: Import source's X509 CA
1434 @param cds: Cluster domain secret
1435 @type timeouts: L{ImportExportTimeouts}
1436 @param timeouts: Timeouts for this import
1439 source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1442 magic_base = utils.GenerateSecret(6)
1444 # Decide whether to use IPv6
1445 ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1448 result = lu.rpc.call_x509_cert_create(instance.primary_node,
1449 constants.RIE_CERT_VALIDITY)
1450 result.Raise("Can't create X509 key and certificate on %s" % result.node)
1452 (x509_key_name, x509_cert_pem) = result.payload
1455 x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1459 signed_x509_cert_pem = \
1460 utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1462 cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1463 len(instance.disks), pnode.primary_ip)
1465 ieloop = ImportExportLoop(lu)
1467 for idx, dev in enumerate(instance.disks):
1468 magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1470 # Import daemon options
1471 opts = objects.ImportExportOptions(key_name=x509_key_name,
1472 ca_pem=source_ca_pem,
1473 magic=magic, ipv6=ipv6)
1475 ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1477 constants.IEIO_SCRIPT, (dev, idx),
1478 timeouts, cbs, private=(idx, )))
1482 ieloop.FinalizeAll()
1484 # Remove crypto key and certificate
1485 result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1486 result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1488 return cbs.disk_results
1491 def _GetImportExportHandshakeMessage(version):
1492 """Returns the handshake message for a RIE protocol version.
1494 @type version: number
1497 return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1500 def ComputeRemoteExportHandshake(cds):
1501 """Computes the remote import/export handshake.
1504 @param cds: Cluster domain secret
1507 salt = utils.GenerateSecret(8)
1508 msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1509 return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1512 def CheckRemoteExportHandshake(cds, handshake):
1513 """Checks the handshake of a remote import/export.
1516 @param cds: Cluster domain secret
1517 @type handshake: sequence
1518 @param handshake: Handshake sent by remote peer
1522 (version, hmac_digest, hmac_salt) = handshake
1523 except (TypeError, ValueError), err:
1524 return "Invalid data: %s" % err
1526 if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1527 hmac_digest, salt=hmac_salt):
1528 return "Hash didn't match, clusters don't share the same domain secret"
1530 if version != constants.RIE_VERSION:
1531 return ("Clusters don't have the same remote import/export protocol"
1532 " (local=%s, remote=%s)" %
1533 (constants.RIE_VERSION, version))
1538 def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1539 """Returns the hashed text for import/export disk information.
1541 @type disk_index: number
1542 @param disk_index: Index of disk (included in hash)
1544 @param host: Hostname
1546 @param port: Daemon port
1548 @param magic: Magic value
1551 return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1554 def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1555 """Verifies received disk information for an export.
1558 @param cds: Cluster domain secret
1559 @type disk_index: number
1560 @param disk_index: Index of disk (included in hash)
1561 @type disk_info: sequence
1562 @param disk_info: Disk information sent by remote peer
1566 (host, port, magic, hmac_digest, hmac_salt) = disk_info
1567 except (TypeError, ValueError), err:
1568 raise errors.GenericError("Invalid data: %s" % err)
1570 if not (host and port and magic):
1571 raise errors.GenericError("Missing destination host, port or magic")
1573 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1575 if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1576 raise errors.GenericError("HMAC is wrong")
1578 if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1581 destination = netutils.Hostname.GetNormalizedName(host)
1583 return (destination,
1584 utils.ValidateServiceName(port),
1588 def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1589 """Computes the signed disk information for a remote import.
1592 @param cds: Cluster domain secret
1594 @param salt: HMAC salt
1595 @type disk_index: number
1596 @param disk_index: Index of disk (included in hash)
1598 @param host: Hostname
1600 @param port: Daemon port
1602 @param magic: Magic value
1605 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1606 hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1607 return (host, port, magic, hmac_digest, salt)