4 # Copyright (C) 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Instance-related functions and classes for masterd.
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import compat
33 from ganeti import utils
34 from ganeti import objects
35 from ganeti import netutils
38 class _ImportExportError(Exception):
39 """Local exception to report import/export errors.
44 class ImportExportTimeouts(object):
45 #: Time until daemon starts writing status file
46 DEFAULT_READY_TIMEOUT = 10
48 #: Length of time until errors cause hard failure
49 DEFAULT_ERROR_TIMEOUT = 10
51 #: Time after which daemon must be listening
52 DEFAULT_LISTEN_TIMEOUT = 10
54 #: Progress update interval
55 DEFAULT_PROGRESS_INTERVAL = 60
65 def __init__(self, connect,
66 listen=DEFAULT_LISTEN_TIMEOUT,
67 error=DEFAULT_ERROR_TIMEOUT,
68 ready=DEFAULT_READY_TIMEOUT,
69 progress=DEFAULT_PROGRESS_INTERVAL):
70 """Initializes this class.
73 @param connect: Timeout for establishing connection
75 @param listen: Timeout for starting to listen for connections
77 @param error: Length of time until errors cause hard failure
79 @param ready: Timeout for daemon to become ready
80 @type progress: number
81 @param progress: Progress update interval
87 self.connect = connect
88 self.progress = progress
91 class ImportExportCbBase(object):
92 """Callbacks for disk import/export.
95 def ReportListening(self, ie, private, component):
96 """Called when daemon started listening.
98 @type ie: Subclass of L{_DiskImportExportBase}
99 @param ie: Import/export object
100 @param private: Private data passed to import/export object
101 @param component: transfer component name
105 def ReportConnected(self, ie, private):
106 """Called when a connection has been established.
108 @type ie: Subclass of L{_DiskImportExportBase}
109 @param ie: Import/export object
110 @param private: Private data passed to import/export object
114 def ReportProgress(self, ie, private):
115 """Called when new progress information should be reported.
117 @type ie: Subclass of L{_DiskImportExportBase}
118 @param ie: Import/export object
119 @param private: Private data passed to import/export object
123 def ReportFinished(self, ie, private):
124 """Called when a transfer has finished.
126 @type ie: Subclass of L{_DiskImportExportBase}
127 @param ie: Import/export object
128 @param private: Private data passed to import/export object
133 def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
134 """Checks whether a timeout has expired.
137 return _time_fn() > (epoch + timeout)
140 class _DiskImportExportBase(object):
143 def __init__(self, lu, node_name, opts,
144 instance, component, timeouts, cbs, private=None):
145 """Initializes this class.
147 @param lu: Logical unit instance
148 @type node_name: string
149 @param node_name: Node name for import
150 @type opts: L{objects.ImportExportOptions}
151 @param opts: Import/export daemon options
152 @type instance: L{objects.Instance}
153 @param instance: Instance object
154 @type component: string
155 @param component: which part of the instance is being imported
156 @type timeouts: L{ImportExportTimeouts}
157 @param timeouts: Timeouts for this import
158 @type cbs: L{ImportExportCbBase}
159 @param cbs: Callbacks
160 @param private: Private data for callback functions
163 assert self.MODE_TEXT
166 self.node_name = node_name
167 self._opts = opts.Copy()
168 self._instance = instance
169 self._component = component
170 self._timeouts = timeouts
172 self._private = private
174 # Set master daemon's timeout in options for import/export daemon
175 assert self._opts.connect_timeout is None
176 self._opts.connect_timeout = timeouts.connect
182 self._ts_begin = None
183 self._ts_connected = None
184 self._ts_finished = None
185 self._ts_cleanup = None
186 self._ts_last_progress = None
187 self._ts_last_error = None
191 self.final_message = None
194 self._daemon_name = None
198 def recent_output(self):
199 """Returns the most recent output from the daemon.
203 return "\n".join(self._daemon.recent_output)
209 """Returns transfer progress information.
215 return (self._daemon.progress_mbytes,
216 self._daemon.progress_throughput,
217 self._daemon.progress_percent,
218 self._daemon.progress_eta)
222 """Returns the magic value for this import/export.
225 return self._opts.magic
229 """Determines whether this transport is still active.
232 return self.success is None
236 """Returns parent loop.
238 @rtype: L{ImportExportLoop}
243 def SetLoop(self, loop):
244 """Sets the parent loop.
246 @type loop: L{ImportExportLoop}
250 raise errors.ProgrammerError("Loop can only be set once")
254 def _StartDaemon(self):
255 """Starts the import/export daemon.
258 raise NotImplementedError()
260 def CheckDaemon(self):
261 """Checks whether daemon has been started and if not, starts it.
267 assert self._ts_cleanup is None
269 if self._daemon_name is None:
270 assert self._ts_begin is None
272 result = self._StartDaemon()
274 raise _ImportExportError("Failed to start %s on %s: %s" %
275 (self.MODE_TEXT, self.node_name,
278 daemon_name = result.payload
280 logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
283 self._ts_begin = time.time()
284 self._daemon_name = daemon_name
286 return self._daemon_name
288 def GetDaemonName(self):
289 """Returns the daemon name.
292 assert self._daemon_name, "Daemon has not been started"
293 assert self._ts_cleanup is None
294 return self._daemon_name
297 """Sends SIGTERM to import/export daemon (if still active).
300 if self._daemon_name:
301 self._lu.LogWarning("Aborting %s '%s' on %s",
302 self.MODE_TEXT, self._daemon_name, self.node_name)
303 result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
305 self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
306 self.MODE_TEXT, self._daemon_name,
307 self.node_name, result.fail_msg)
312 def _SetDaemonData(self, data):
313 """Internal function for updating status daemon data.
315 @type data: L{objects.ImportExportStatus}
316 @param data: Daemon status data
319 assert self._ts_begin is not None
322 if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
323 raise _ImportExportError("Didn't become ready after %s seconds" %
324 self._timeouts.ready)
332 def SetDaemonData(self, success, data):
333 """Updates daemon status data.
336 @param success: Whether fetching data was successful or not
337 @type data: L{objects.ImportExportStatus}
338 @param data: Daemon status data
342 if self._ts_last_error is None:
343 self._ts_last_error = time.time()
345 elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
346 raise _ImportExportError("Too many errors while updating data")
350 self._ts_last_error = None
352 return self._SetDaemonData(data)
354 def CheckListening(self):
355 """Checks whether the daemon is listening.
358 raise NotImplementedError()
360 def _GetConnectedCheckEpoch(self):
361 """Returns timeout to calculate connect timeout.
364 raise NotImplementedError()
366 def CheckConnected(self):
367 """Checks whether the daemon is connected.
370 @return: Whether the daemon is connected
373 assert self._daemon, "Daemon status missing"
375 if self._ts_connected is not None:
378 if self._daemon.connected:
379 self._ts_connected = time.time()
381 # TODO: Log remote peer
382 logging.debug("%s '%s' on %s is now connected",
383 self.MODE_TEXT, self._daemon_name, self.node_name)
385 self._cbs.ReportConnected(self, self._private)
389 if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
390 raise _ImportExportError("Not connected after %s seconds" %
391 self._timeouts.connect)
395 def _CheckProgress(self):
396 """Checks whether a progress update should be reported.
399 if ((self._ts_last_progress is None or
400 _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
402 self._daemon.progress_mbytes is not None and
403 self._daemon.progress_throughput is not None):
404 self._cbs.ReportProgress(self, self._private)
405 self._ts_last_progress = time.time()
407 def CheckFinished(self):
408 """Checks whether the daemon exited.
411 @return: Whether the transfer is finished
414 assert self._daemon, "Daemon status missing"
416 if self._ts_finished:
419 if self._daemon.exit_status is None:
420 # TODO: Adjust delay for ETA expiring soon
421 self._CheckProgress()
424 self._ts_finished = time.time()
426 self._ReportFinished(self._daemon.exit_status == 0,
427 self._daemon.error_message)
431 def _ReportFinished(self, success, message):
432 """Transfer is finished or daemon exited.
435 @param success: Whether the transfer was successful
436 @type message: string
437 @param message: Error message
440 assert self.success is None
442 self.success = success
443 self.final_message = message
446 logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
447 self._daemon_name, self.node_name)
448 elif self._daemon_name:
449 self._lu.LogWarning("%s '%s' on %s failed: %s",
450 self.MODE_TEXT, self._daemon_name, self.node_name,
453 self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
454 self.node_name, message)
456 self._cbs.ReportFinished(self, self._private)
459 """Makes the RPC call to finalize this import/export.
462 return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
464 def Finalize(self, error=None):
465 """Finalizes this import/export.
468 if self._daemon_name:
469 logging.info("Finalizing %s '%s' on %s",
470 self.MODE_TEXT, self._daemon_name, self.node_name)
472 result = self._Finalize()
474 self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
475 self.MODE_TEXT, self._daemon_name,
476 self.node_name, result.fail_msg)
479 # Daemon is no longer running
480 self._daemon_name = None
481 self._ts_cleanup = time.time()
484 self._ReportFinished(False, error)
489 class DiskImport(_DiskImportExportBase):
492 def __init__(self, lu, node_name, opts, instance, component,
493 dest, dest_args, timeouts, cbs, private=None):
494 """Initializes this class.
496 @param lu: Logical unit instance
497 @type node_name: string
498 @param node_name: Node name for import
499 @type opts: L{objects.ImportExportOptions}
500 @param opts: Import/export daemon options
501 @type instance: L{objects.Instance}
502 @param instance: Instance object
503 @type component: string
504 @param component: which part of the instance is being imported
505 @param dest: I/O destination
506 @param dest_args: I/O arguments
507 @type timeouts: L{ImportExportTimeouts}
508 @param timeouts: Timeouts for this import
509 @type cbs: L{ImportExportCbBase}
510 @param cbs: Callbacks
511 @param private: Private data for callback functions
514 _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
515 component, timeouts, cbs, private)
517 self._dest_args = dest_args
520 self._ts_listening = None
523 def listen_port(self):
524 """Returns the port the daemon is listening on.
528 return self._daemon.listen_port
532 def _StartDaemon(self):
533 """Starts the import daemon.
536 return self._lu.rpc.call_import_start(self.node_name, self._opts,
537 self._instance, self._component,
538 self._dest, self._dest_args)
540 def CheckListening(self):
541 """Checks whether the daemon is listening.
544 @return: Whether the daemon is listening
547 assert self._daemon, "Daemon status missing"
549 if self._ts_listening is not None:
552 port = self._daemon.listen_port
554 self._ts_listening = time.time()
556 logging.debug("Import '%s' on %s is now listening on port %s",
557 self._daemon_name, self.node_name, port)
559 self._cbs.ReportListening(self, self._private, self._component)
563 if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
564 raise _ImportExportError("Not listening after %s seconds" %
565 self._timeouts.listen)
569 def _GetConnectedCheckEpoch(self):
570 """Returns the time since we started listening.
573 assert self._ts_listening is not None, \
574 ("Checking whether an import is connected is only useful"
575 " once it's been listening")
577 return self._ts_listening
580 class DiskExport(_DiskImportExportBase):
583 def __init__(self, lu, node_name, opts, dest_host, dest_port,
584 instance, component, source, source_args,
585 timeouts, cbs, private=None):
586 """Initializes this class.
588 @param lu: Logical unit instance
589 @type node_name: string
590 @param node_name: Node name for import
591 @type opts: L{objects.ImportExportOptions}
592 @param opts: Import/export daemon options
593 @type dest_host: string
594 @param dest_host: Destination host name or IP address
595 @type dest_port: number
596 @param dest_port: Destination port number
597 @type instance: L{objects.Instance}
598 @param instance: Instance object
599 @type component: string
600 @param component: which part of the instance is being imported
601 @param source: I/O source
602 @param source_args: I/O source
603 @type timeouts: L{ImportExportTimeouts}
604 @param timeouts: Timeouts for this import
605 @type cbs: L{ImportExportCbBase}
606 @param cbs: Callbacks
607 @param private: Private data for callback functions
610 _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
611 component, timeouts, cbs, private)
612 self._dest_host = dest_host
613 self._dest_port = dest_port
614 self._source = source
615 self._source_args = source_args
617 def _StartDaemon(self):
618 """Starts the export daemon.
621 return self._lu.rpc.call_export_start(self.node_name, self._opts,
622 self._dest_host, self._dest_port,
623 self._instance, self._component,
624 self._source, self._source_args)
626 def CheckListening(self):
627 """Checks whether the daemon is listening.
630 # Only an import can be listening
633 def _GetConnectedCheckEpoch(self):
634 """Returns the time since the daemon started.
637 assert self._ts_begin is not None
639 return self._ts_begin
642 def FormatProgress(progress):
643 """Formats progress information for user consumption
646 (mbytes, throughput, percent, eta) = progress
649 utils.FormatUnit(mbytes, "h"),
651 # Not using FormatUnit as it doesn't support kilobytes
652 "%0.1f MiB/s" % throughput,
655 if percent is not None:
656 parts.append("%d%%" % percent)
659 parts.append("ETA %s" % utils.FormatSeconds(eta))
661 return utils.CommaJoin(parts)
664 class ImportExportLoop:
668 def __init__(self, lu):
669 """Initializes this class.
674 self._pending_add = []
676 def Add(self, diskie):
677 """Adds an import/export object to the loop.
679 @type diskie: Subclass of L{_DiskImportExportBase}
680 @param diskie: Import/export object
683 assert diskie not in self._pending_add
684 assert diskie.loop is None
688 # Adding new objects to a staging list is necessary, otherwise the main
689 # loop gets confused if callbacks modify the queue while the main loop is
691 self._pending_add.append(diskie)
694 def _CollectDaemonStatus(lu, daemons):
695 """Collects the status for all import/export daemons.
700 for node_name, names in daemons.iteritems():
701 result = lu.rpc.call_impexp_status(node_name, names)
703 lu.LogWarning("Failed to get daemon status on %s: %s",
704 node_name, result.fail_msg)
707 assert len(names) == len(result.payload)
709 daemon_status[node_name] = dict(zip(names, result.payload))
714 def _GetActiveDaemonNames(queue):
715 """Gets the names of all active daemons.
720 if not diskie.active:
724 # Start daemon if necessary
725 daemon_name = diskie.CheckDaemon()
726 except _ImportExportError, err:
727 logging.exception("%s failed", diskie.MODE_TEXT)
728 diskie.Finalize(error=str(err))
731 result.setdefault(diskie.node_name, []).append(daemon_name)
733 assert len(queue) >= len(result)
734 assert len(queue) >= sum([len(names) for names in result.itervalues()])
736 logging.debug("daemons=%r", result)
740 def _AddPendingToQueue(self):
741 """Adds all pending import/export objects to the internal queue.
744 assert compat.all(diskie not in self._queue and diskie.loop == self
745 for diskie in self._pending_add)
747 self._queue.extend(self._pending_add)
749 del self._pending_add[:]
752 """Utility main loop.
756 self._AddPendingToQueue()
758 # Collect all active daemon names
759 daemons = self._GetActiveDaemonNames(self._queue)
763 # Collection daemon status data
764 data = self._CollectDaemonStatus(self._lu, daemons)
767 delay = self.MAX_DELAY
768 for diskie in self._queue:
769 if not diskie.active:
774 all_daemon_data = data[diskie.node_name]
776 result = diskie.SetDaemonData(False, None)
779 diskie.SetDaemonData(True,
780 all_daemon_data[diskie.GetDaemonName()])
783 # Daemon not yet ready, retry soon
784 delay = min(3.0, delay)
787 if diskie.CheckFinished():
792 # Normal case: check again in 5 seconds
793 delay = min(5.0, delay)
795 if not diskie.CheckListening():
796 # Not yet listening, retry soon
797 delay = min(1.0, delay)
800 if not diskie.CheckConnected():
801 # Not yet connected, retry soon
802 delay = min(1.0, delay)
805 except _ImportExportError, err:
806 logging.exception("%s failed", diskie.MODE_TEXT)
807 diskie.Finalize(error=str(err))
809 if not compat.any(diskie.active for diskie in self._queue):
813 delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
814 logging.debug("Waiting for %ss", delay)
817 def FinalizeAll(self):
818 """Finalizes all pending transfers.
823 for diskie in self._queue:
824 success = diskie.Finalize() and success
829 class _TransferInstCbBase(ImportExportCbBase):
830 def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
832 """Initializes this class.
835 ImportExportCbBase.__init__(self)
838 self.feedback_fn = feedback_fn
839 self.instance = instance
840 self.timeouts = timeouts
841 self.src_node = src_node
842 self.src_cbs = src_cbs
843 self.dest_node = dest_node
844 self.dest_ip = dest_ip
847 class _TransferInstSourceCb(_TransferInstCbBase):
848 def ReportConnected(self, ie, dtp):
849 """Called when a connection has been established.
852 assert self.src_cbs is None
853 assert dtp.src_export == ie
854 assert dtp.dest_import
856 self.feedback_fn("%s is sending data on %s" %
857 (dtp.data.name, ie.node_name))
859 def ReportProgress(self, ie, dtp):
860 """Called when new progress information should be reported.
863 progress = ie.progress
867 self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
869 def ReportFinished(self, ie, dtp):
870 """Called when a transfer has finished.
873 assert self.src_cbs is None
874 assert dtp.src_export == ie
875 assert dtp.dest_import
878 self.feedback_fn("%s finished sending data" % dtp.data.name)
880 self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
881 (dtp.data.name, ie.final_message, ie.recent_output))
883 dtp.RecordResult(ie.success)
885 cb = dtp.data.finished_fn
889 # TODO: Check whether sending SIGTERM right away is okay, maybe we should
890 # give the daemon a moment to sort things out
891 if dtp.dest_import and not ie.success:
892 dtp.dest_import.Abort()
895 class _TransferInstDestCb(_TransferInstCbBase):
896 def ReportListening(self, ie, dtp, component):
897 """Called when daemon started listening.
901 assert dtp.src_export is None
902 assert dtp.dest_import
903 assert dtp.export_opts
905 self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
907 # Start export on source node
908 de = DiskExport(self.lu, self.src_node, dtp.export_opts,
909 self.dest_ip, ie.listen_port, self.instance,
910 component, dtp.data.src_io, dtp.data.src_ioargs,
911 self.timeouts, self.src_cbs, private=dtp)
916 def ReportConnected(self, ie, dtp):
917 """Called when a connection has been established.
920 self.feedback_fn("%s is receiving data on %s" %
921 (dtp.data.name, self.dest_node))
923 def ReportFinished(self, ie, dtp):
924 """Called when a transfer has finished.
928 self.feedback_fn("%s finished receiving data" % dtp.data.name)
930 self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
931 (dtp.data.name, ie.final_message, ie.recent_output))
933 dtp.RecordResult(ie.success)
935 # TODO: Check whether sending SIGTERM right away is okay, maybe we should
936 # give the daemon a moment to sort things out
937 if dtp.src_export and not ie.success:
938 dtp.src_export.Abort()
941 class DiskTransfer(object):
942 def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
944 """Initializes this class.
947 @param name: User-visible name for this transfer (e.g. "disk/0")
948 @param src_io: Source I/O type
949 @param src_ioargs: Source I/O arguments
950 @param dest_io: Destination I/O type
951 @param dest_ioargs: Destination I/O arguments
952 @type finished_fn: callable
953 @param finished_fn: Function called once transfer has finished
959 self.src_ioargs = src_ioargs
961 self.dest_io = dest_io
962 self.dest_ioargs = dest_ioargs
964 self.finished_fn = finished_fn
967 class _DiskTransferPrivate(object):
968 def __init__(self, data, success, export_opts):
969 """Initializes this class.
971 @type data: L{DiskTransfer}
976 self.success = success
977 self.export_opts = export_opts
979 self.src_export = None
980 self.dest_import = None
982 def RecordResult(self, success):
983 """Updates the status.
985 One failed part will cause the whole transfer to fail.
988 self.success = self.success and success
991 def _GetInstDiskMagic(base, instance_name, index):
992 """Computes the magic value for a disk export or import.
995 @param base: Random seed value (can be the same for all disks of a transfer)
996 @type instance_name: string
997 @param instance_name: Name of instance
999 @param index: Disk index
1002 h = compat.sha1_hash()
1003 h.update(str(constants.RIE_VERSION))
1005 h.update(instance_name)
1006 h.update(str(index))
1007 return h.hexdigest()
1010 def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
1011 instance, all_transfers):
1012 """Transfers an instance's data from one node to another.
1014 @param lu: Logical unit instance
1015 @param feedback_fn: Feedback function
1016 @type src_node: string
1017 @param src_node: Source node name
1018 @type dest_node: string
1019 @param dest_node: Destination node name
1020 @type dest_ip: string
1021 @param dest_ip: IP address of destination node
1022 @type instance: L{objects.Instance}
1023 @param instance: Instance object
1024 @type all_transfers: list of L{DiskTransfer} instances
1025 @param all_transfers: List of all disk transfers to be made
1027 @return: List with a boolean (True=successful, False=failed) for success for
1031 # Disable compression for all moves as these are all within the same cluster
1032 compress = constants.IEC_NONE
1034 logging.debug("Source node %s, destination node %s, compression '%s'",
1035 src_node, dest_node, compress)
1037 timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1038 src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1039 src_node, None, dest_node, dest_ip)
1040 dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1041 src_node, src_cbs, dest_node, dest_ip)
1045 base_magic = utils.GenerateSecret(6)
1047 ieloop = ImportExportLoop(lu)
1049 for idx, transfer in enumerate(all_transfers):
1051 feedback_fn("Exporting %s from %s to %s" %
1052 (transfer.name, src_node, dest_node))
1054 magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1055 opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1056 compress=compress, magic=magic)
1058 dtp = _DiskTransferPrivate(transfer, True, opts)
1060 di = DiskImport(lu, dest_node, opts, instance, "disk%d" % idx,
1061 transfer.dest_io, transfer.dest_ioargs,
1062 timeouts, dest_cbs, private=dtp)
1065 dtp.dest_import = di
1067 dtp = _DiskTransferPrivate(None, False, None)
1073 ieloop.FinalizeAll()
1075 assert len(all_dtp) == len(all_transfers)
1076 assert compat.all((dtp.src_export is None or
1077 dtp.src_export.success is not None) and
1078 (dtp.dest_import is None or
1079 dtp.dest_import.success is not None)
1080 for dtp in all_dtp), \
1081 "Not all imports/exports are finalized"
1083 return [bool(dtp.success) for dtp in all_dtp]
1086 class _RemoteExportCb(ImportExportCbBase):
1087 def __init__(self, feedback_fn, disk_count):
1088 """Initializes this class.
1091 ImportExportCbBase.__init__(self)
1092 self._feedback_fn = feedback_fn
1093 self._dresults = [None] * disk_count
1096 def disk_results(self):
1097 """Returns per-disk results.
1100 return self._dresults
1102 def ReportConnected(self, ie, private):
1103 """Called when a connection has been established.
1108 self._feedback_fn("Disk %s is now sending data" % idx)
1110 def ReportProgress(self, ie, private):
1111 """Called when new progress information should be reported.
1116 progress = ie.progress
1120 self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1122 def ReportFinished(self, ie, private):
1123 """Called when a transfer has finished.
1126 (idx, finished_fn) = private
1129 self._feedback_fn("Disk %s finished sending data" % idx)
1131 self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1132 (idx, ie.final_message, ie.recent_output))
1134 self._dresults[idx] = bool(ie.success)
1140 class ExportInstanceHelper:
1141 def __init__(self, lu, feedback_fn, instance):
1142 """Initializes this class.
1144 @param lu: Logical unit instance
1145 @param feedback_fn: Feedback function
1146 @type instance: L{objects.Instance}
1147 @param instance: Instance object
1151 self._feedback_fn = feedback_fn
1152 self._instance = instance
1154 self._snap_disks = []
1155 self._removed_snaps = [False] * len(instance.disks)
1157 def CreateSnapshots(self):
1158 """Creates an LVM snapshot for every disk of the instance.
1161 assert not self._snap_disks
1163 instance = self._instance
1164 src_node = instance.primary_node
1166 for idx, disk in enumerate(instance.disks):
1167 self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1170 # result.payload will be a snapshot of an lvm leaf of the one we
1172 result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1174 msg = result.fail_msg
1176 self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1178 elif (not isinstance(result.payload, (tuple, list)) or
1179 len(result.payload) != 2):
1180 self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1181 " result '%s'", idx, src_node, result.payload)
1183 disk_id = tuple(result.payload)
1184 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1185 logical_id=disk_id, physical_id=disk_id,
1186 iv_name=disk.iv_name)
1188 self._snap_disks.append(new_dev)
1190 assert len(self._snap_disks) == len(instance.disks)
1191 assert len(self._removed_snaps) == len(instance.disks)
1193 def _RemoveSnapshot(self, disk_index):
1194 """Removes an LVM snapshot.
1196 @type disk_index: number
1197 @param disk_index: Index of the snapshot to be removed
1200 disk = self._snap_disks[disk_index]
1201 if disk and not self._removed_snaps[disk_index]:
1202 src_node = self._instance.primary_node
1204 self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1205 (disk_index, src_node))
1207 result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1209 self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1210 " %s: %s", disk_index, src_node, result.fail_msg)
1212 self._removed_snaps[disk_index] = True
1214 def LocalExport(self, dest_node):
1215 """Intra-cluster instance export.
1217 @type dest_node: L{objects.Node}
1218 @param dest_node: Destination node
1221 instance = self._instance
1222 src_node = instance.primary_node
1224 assert len(self._snap_disks) == len(instance.disks)
1228 for idx, dev in enumerate(self._snap_disks):
1230 transfers.append(None)
1233 path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1236 finished_fn = compat.partial(self._TransferFinished, idx)
1238 # FIXME: pass debug option from opcode to backend
1239 dt = DiskTransfer("snapshot/%s" % idx,
1240 constants.IEIO_SCRIPT, (dev, idx),
1241 constants.IEIO_FILE, (path, ),
1243 transfers.append(dt)
1245 # Actually export data
1246 dresults = TransferInstanceData(self._lu, self._feedback_fn,
1247 src_node, dest_node.name,
1248 dest_node.secondary_ip,
1249 instance, transfers)
1251 assert len(dresults) == len(instance.disks)
1253 self._feedback_fn("Finalizing export on %s" % dest_node.name)
1254 result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1256 msg = result.fail_msg
1259 self._lu.LogWarning("Could not finalize export for instance %s"
1260 " on node %s: %s", instance.name, dest_node.name, msg)
1262 return (fin_resu, dresults)
1264 def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1265 """Inter-cluster instance export.
1267 @type disk_info: list
1268 @param disk_info: Per-disk destination information
1269 @type key_name: string
1270 @param key_name: Name of X509 key to use
1271 @type dest_ca_pem: string
1272 @param dest_ca_pem: Destination X509 CA in PEM format
1273 @type timeouts: L{ImportExportTimeouts}
1274 @param timeouts: Timeouts for this import
1277 instance = self._instance
1279 assert len(disk_info) == len(instance.disks)
1281 cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1283 ieloop = ImportExportLoop(self._lu)
1285 for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1287 # Decide whether to use IPv6
1288 ipv6 = netutils.IP6Address.IsValid(host)
1290 opts = objects.ImportExportOptions(key_name=key_name,
1292 magic=magic, ipv6=ipv6)
1294 self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1295 finished_fn = compat.partial(self._TransferFinished, idx)
1296 ieloop.Add(DiskExport(self._lu, instance.primary_node,
1297 opts, host, port, instance, "disk%d" % idx,
1298 constants.IEIO_SCRIPT, (dev, idx),
1299 timeouts, cbs, private=(idx, finished_fn)))
1303 ieloop.FinalizeAll()
1305 return (True, cbs.disk_results)
1307 def _TransferFinished(self, idx):
1308 """Called once a transfer has finished.
1311 @param idx: Disk index
1314 logging.debug("Transfer %s finished", idx)
1315 self._RemoveSnapshot(idx)
1318 """Remove all snapshots.
1321 assert len(self._removed_snaps) == len(self._instance.disks)
1322 for idx in range(len(self._instance.disks)):
1323 self._RemoveSnapshot(idx)
1326 class _RemoteImportCb(ImportExportCbBase):
1327 def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1329 """Initializes this class.
1332 @param cds: Cluster domain secret
1333 @type x509_cert_pem: string
1334 @param x509_cert_pem: CA used for signing import key
1335 @type disk_count: number
1336 @param disk_count: Number of disks
1337 @type external_address: string
1338 @param external_address: External address of destination node
1341 ImportExportCbBase.__init__(self)
1342 self._feedback_fn = feedback_fn
1344 self._x509_cert_pem = x509_cert_pem
1345 self._disk_count = disk_count
1346 self._external_address = external_address
1348 self._dresults = [None] * disk_count
1349 self._daemon_port = [None] * disk_count
1351 self._salt = utils.GenerateSecret(8)
1354 def disk_results(self):
1355 """Returns per-disk results.
1358 return self._dresults
1360 def _CheckAllListening(self):
1361 """Checks whether all daemons are listening.
1363 If all daemons are listening, the information is sent to the client.
1366 if not compat.all(dp is not None for dp in self._daemon_port):
1369 host = self._external_address
1372 for idx, (port, magic) in enumerate(self._daemon_port):
1373 disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1374 idx, host, port, magic))
1376 assert len(disks) == self._disk_count
1378 self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1380 "x509_ca": self._x509_cert_pem,
1383 def ReportListening(self, ie, private, _):
1384 """Called when daemon started listening.
1389 self._feedback_fn("Disk %s is now listening" % idx)
1391 assert self._daemon_port[idx] is None
1393 self._daemon_port[idx] = (ie.listen_port, ie.magic)
1395 self._CheckAllListening()
1397 def ReportConnected(self, ie, private):
1398 """Called when a connection has been established.
1403 self._feedback_fn("Disk %s is now receiving data" % idx)
1405 def ReportFinished(self, ie, private):
1406 """Called when a transfer has finished.
1411 # Daemon is certainly no longer listening
1412 self._daemon_port[idx] = None
1415 self._feedback_fn("Disk %s finished receiving data" % idx)
1417 self._feedback_fn(("Disk %s failed to receive data: %s"
1418 " (recent output: %s)") %
1419 (idx, ie.final_message, ie.recent_output))
1421 self._dresults[idx] = bool(ie.success)
1424 def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1426 """Imports an instance from another cluster.
1428 @param lu: Logical unit instance
1429 @param feedback_fn: Feedback function
1430 @type instance: L{objects.Instance}
1431 @param instance: Instance object
1432 @type pnode: L{objects.Node}
1433 @param pnode: Primary node of instance as an object
1434 @type source_x509_ca: OpenSSL.crypto.X509
1435 @param source_x509_ca: Import source's X509 CA
1437 @param cds: Cluster domain secret
1438 @type timeouts: L{ImportExportTimeouts}
1439 @param timeouts: Timeouts for this import
1442 source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1445 magic_base = utils.GenerateSecret(6)
1447 # Decide whether to use IPv6
1448 ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1451 result = lu.rpc.call_x509_cert_create(instance.primary_node,
1452 constants.RIE_CERT_VALIDITY)
1453 result.Raise("Can't create X509 key and certificate on %s" % result.node)
1455 (x509_key_name, x509_cert_pem) = result.payload
1458 x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1462 signed_x509_cert_pem = \
1463 utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1465 cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1466 len(instance.disks), pnode.primary_ip)
1468 ieloop = ImportExportLoop(lu)
1470 for idx, dev in enumerate(instance.disks):
1471 magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1473 # Import daemon options
1474 opts = objects.ImportExportOptions(key_name=x509_key_name,
1475 ca_pem=source_ca_pem,
1476 magic=magic, ipv6=ipv6)
1478 ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1480 constants.IEIO_SCRIPT, (dev, idx),
1481 timeouts, cbs, private=(idx, )))
1485 ieloop.FinalizeAll()
1487 # Remove crypto key and certificate
1488 result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1489 result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1491 return cbs.disk_results
1494 def _GetImportExportHandshakeMessage(version):
1495 """Returns the handshake message for a RIE protocol version.
1497 @type version: number
1500 return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1503 def ComputeRemoteExportHandshake(cds):
1504 """Computes the remote import/export handshake.
1507 @param cds: Cluster domain secret
1510 salt = utils.GenerateSecret(8)
1511 msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1512 return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1515 def CheckRemoteExportHandshake(cds, handshake):
1516 """Checks the handshake of a remote import/export.
1519 @param cds: Cluster domain secret
1520 @type handshake: sequence
1521 @param handshake: Handshake sent by remote peer
1525 (version, hmac_digest, hmac_salt) = handshake
1526 except (TypeError, ValueError), err:
1527 return "Invalid data: %s" % err
1529 if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1530 hmac_digest, salt=hmac_salt):
1531 return "Hash didn't match, clusters don't share the same domain secret"
1533 if version != constants.RIE_VERSION:
1534 return ("Clusters don't have the same remote import/export protocol"
1535 " (local=%s, remote=%s)" %
1536 (constants.RIE_VERSION, version))
1541 def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1542 """Returns the hashed text for import/export disk information.
1544 @type disk_index: number
1545 @param disk_index: Index of disk (included in hash)
1547 @param host: Hostname
1549 @param port: Daemon port
1551 @param magic: Magic value
1554 return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1557 def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1558 """Verifies received disk information for an export.
1561 @param cds: Cluster domain secret
1562 @type disk_index: number
1563 @param disk_index: Index of disk (included in hash)
1564 @type disk_info: sequence
1565 @param disk_info: Disk information sent by remote peer
1569 (host, port, magic, hmac_digest, hmac_salt) = disk_info
1570 except (TypeError, ValueError), err:
1571 raise errors.GenericError("Invalid data: %s" % err)
1573 if not (host and port and magic):
1574 raise errors.GenericError("Missing destination host, port or magic")
1576 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1578 if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1579 raise errors.GenericError("HMAC is wrong")
1581 if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1584 destination = netutils.Hostname.GetNormalizedName(host)
1586 return (destination,
1587 utils.ValidateServiceName(port),
1591 def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1592 """Computes the signed disk information for a remote import.
1595 @param cds: Cluster domain secret
1597 @param salt: HMAC salt
1598 @type disk_index: number
1599 @param disk_index: Index of disk (included in hash)
1601 @param host: Hostname
1603 @param port: Daemon port
1605 @param magic: Magic value
1608 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1609 hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1610 return (host, port, magic, hmac_digest, salt)