4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Instance-related functions and classes for masterd.
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import compat
33 from ganeti import utils
34 from ganeti import objects
37 class _ImportExportError(Exception):
38 """Local exception to report import/export errors.
43 class ImportExportTimeouts(object):
44 #: Time until daemon starts writing status file
45 DEFAULT_READY_TIMEOUT = 10
47 #: Length of time until errors cause hard failure
48 DEFAULT_ERROR_TIMEOUT = 10
50 #: Time after which daemon must be listening
51 DEFAULT_LISTEN_TIMEOUT = 10
53 #: Progress update interval
54 DEFAULT_PROGRESS_INTERVAL = 60
64 def __init__(self, connect,
65 listen=DEFAULT_LISTEN_TIMEOUT,
66 error=DEFAULT_ERROR_TIMEOUT,
67 ready=DEFAULT_READY_TIMEOUT,
68 progress=DEFAULT_PROGRESS_INTERVAL):
69 """Initializes this class.
72 @param connect: Timeout for establishing connection
74 @param listen: Timeout for starting to listen for connections
76 @param error: Length of time until errors cause hard failure
78 @param ready: Timeout for daemon to become ready
79 @type progress: number
80 @param progress: Progress update interval
86 self.connect = connect
87 self.progress = progress
90 class ImportExportCbBase(object):
91 """Callbacks for disk import/export.
94 def ReportListening(self, ie, private):
95 """Called when daemon started listening.
97 @type ie: Subclass of L{_DiskImportExportBase}
98 @param ie: Import/export object
99 @param private: Private data passed to import/export object
103 def ReportConnected(self, ie, private):
104 """Called when a connection has been established.
106 @type ie: Subclass of L{_DiskImportExportBase}
107 @param ie: Import/export object
108 @param private: Private data passed to import/export object
112 def ReportProgress(self, ie, private):
113 """Called when new progress information should be reported.
115 @type ie: Subclass of L{_DiskImportExportBase}
116 @param ie: Import/export object
117 @param private: Private data passed to import/export object
121 def ReportFinished(self, ie, private):
122 """Called when a transfer has finished.
124 @type ie: Subclass of L{_DiskImportExportBase}
125 @param ie: Import/export object
126 @param private: Private data passed to import/export object
131 def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
132 """Checks whether a timeout has expired.
135 return _time_fn() > (epoch + timeout)
138 class _DiskImportExportBase(object):
141 def __init__(self, lu, node_name, opts,
142 instance, timeouts, cbs, private=None):
143 """Initializes this class.
145 @param lu: Logical unit instance
146 @type node_name: string
147 @param node_name: Node name for import
148 @type opts: L{objects.ImportExportOptions}
149 @param opts: Import/export daemon options
150 @type instance: L{objects.Instance}
151 @param instance: Instance object
152 @type timeouts: L{ImportExportTimeouts}
153 @param timeouts: Timeouts for this import
154 @type cbs: L{ImportExportCbBase}
155 @param cbs: Callbacks
156 @param private: Private data for callback functions
159 assert self.MODE_TEXT
162 self.node_name = node_name
164 self._instance = instance
165 self._timeouts = timeouts
167 self._private = private
173 self._ts_begin = None
174 self._ts_connected = None
175 self._ts_finished = None
176 self._ts_cleanup = None
177 self._ts_last_progress = None
178 self._ts_last_error = None
182 self.final_message = None
185 self._daemon_name = None
189 def recent_output(self):
190 """Returns the most recent output from the daemon.
194 return self._daemon.recent_output
200 """Returns transfer progress information.
206 return (self._daemon.progress_mbytes,
207 self._daemon.progress_throughput,
208 self._daemon.progress_percent,
209 self._daemon.progress_eta)
213 """Returns the magic value for this import/export.
216 return self._opts.magic
220 """Determines whether this transport is still active.
223 return self.success is None
227 """Returns parent loop.
229 @rtype: L{ImportExportLoop}
234 def SetLoop(self, loop):
235 """Sets the parent loop.
237 @type loop: L{ImportExportLoop}
241 raise errors.ProgrammerError("Loop can only be set once")
245 def _StartDaemon(self):
246 """Starts the import/export daemon.
249 raise NotImplementedError()
251 def CheckDaemon(self):
252 """Checks whether daemon has been started and if not, starts it.
258 assert self._ts_cleanup is None
260 if self._daemon_name is None:
261 assert self._ts_begin is None
263 result = self._StartDaemon()
265 raise _ImportExportError("Failed to start %s on %s: %s" %
266 (self.MODE_TEXT, self.node_name,
269 daemon_name = result.payload
271 logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
274 self._ts_begin = time.time()
275 self._daemon_name = daemon_name
277 return self._daemon_name
279 def GetDaemonName(self):
280 """Returns the daemon name.
283 assert self._daemon_name, "Daemon has not been started"
284 assert self._ts_cleanup is None
285 return self._daemon_name
288 """Sends SIGTERM to import/export daemon (if still active).
291 if self._daemon_name:
292 self._lu.LogWarning("Aborting %s %r on %s",
293 self.MODE_TEXT, self._daemon_name, self.node_name)
294 result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
296 self._lu.LogWarning("Failed to abort %s %r on %s: %s",
297 self.MODE_TEXT, self._daemon_name,
298 self.node_name, result.fail_msg)
303 def _SetDaemonData(self, data):
304 """Internal function for updating status daemon data.
306 @type data: L{objects.ImportExportStatus}
307 @param data: Daemon status data
310 assert self._ts_begin is not None
313 if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
314 raise _ImportExportError("Didn't become ready after %s seconds" %
315 self._timeouts.ready)
323 def SetDaemonData(self, success, data):
324 """Updates daemon status data.
327 @param success: Whether fetching data was successful or not
328 @type data: L{objects.ImportExportStatus}
329 @param data: Daemon status data
333 if self._ts_last_error is None:
334 self._ts_last_error = time.time()
336 elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
337 raise _ImportExportError("Too many errors while updating data")
341 self._ts_last_error = None
343 return self._SetDaemonData(data)
345 def CheckListening(self):
346 """Checks whether the daemon is listening.
349 raise NotImplementedError()
351 def _GetConnectedCheckEpoch(self):
352 """Returns timeout to calculate connect timeout.
355 raise NotImplementedError()
357 def CheckConnected(self):
358 """Checks whether the daemon is connected.
361 @return: Whether the daemon is connected
364 assert self._daemon, "Daemon status missing"
366 if self._ts_connected is not None:
369 if self._daemon.connected:
370 self._ts_connected = time.time()
372 # TODO: Log remote peer
373 logging.debug("%s %r on %s is now connected",
374 self.MODE_TEXT, self._daemon_name, self.node_name)
376 self._cbs.ReportConnected(self, self._private)
380 if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
381 raise _ImportExportError("Not connected after %s seconds" %
382 self._timeouts.connect)
386 def _CheckProgress(self):
387 """Checks whether a progress update should be reported.
390 if ((self._ts_last_progress is None or
391 _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
393 self._daemon.progress_mbytes is not None and
394 self._daemon.progress_throughput is not None):
395 self._cbs.ReportProgress(self, self._private)
396 self._ts_last_progress = time.time()
398 def CheckFinished(self):
399 """Checks whether the daemon exited.
402 @return: Whether the transfer is finished
405 assert self._daemon, "Daemon status missing"
407 if self._ts_finished:
410 if self._daemon.exit_status is None:
411 # TODO: Adjust delay for ETA expiring soon
412 self._CheckProgress()
415 self._ts_finished = time.time()
417 self._ReportFinished(self._daemon.exit_status == 0,
418 self._daemon.error_message)
422 def _ReportFinished(self, success, message):
423 """Transfer is finished or daemon exited.
426 @param success: Whether the transfer was successful
427 @type message: string
428 @param message: Error message
431 assert self.success is None
433 self.success = success
434 self.final_message = message
437 logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
439 elif self._daemon_name:
440 self._lu.LogWarning("%s %r on %s failed: %s",
441 self.MODE_TEXT, self._daemon_name, self.node_name,
444 self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
445 self.node_name, message)
447 self._cbs.ReportFinished(self, self._private)
450 """Makes the RPC call to finalize this import/export.
453 return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
455 def Finalize(self, error=None):
456 """Finalizes this import/export.
459 if self._daemon_name:
460 logging.info("Finalizing %s %r on %s",
461 self.MODE_TEXT, self._daemon_name, self.node_name)
463 result = self._Finalize()
465 self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
466 self.MODE_TEXT, self._daemon_name,
467 self.node_name, result.fail_msg)
470 # Daemon is no longer running
471 self._daemon_name = None
472 self._ts_cleanup = time.time()
475 self._ReportFinished(False, error)
480 class DiskImport(_DiskImportExportBase):
483 def __init__(self, lu, node_name, opts, instance,
484 dest, dest_args, timeouts, cbs, private=None):
485 """Initializes this class.
487 @param lu: Logical unit instance
488 @type node_name: string
489 @param node_name: Node name for import
490 @type opts: L{objects.ImportExportOptions}
491 @param opts: Import/export daemon options
492 @type instance: L{objects.Instance}
493 @param instance: Instance object
494 @param dest: I/O destination
495 @param dest_args: I/O arguments
496 @type timeouts: L{ImportExportTimeouts}
497 @param timeouts: Timeouts for this import
498 @type cbs: L{ImportExportCbBase}
499 @param cbs: Callbacks
500 @param private: Private data for callback functions
503 _DiskImportExportBase.__init__(self, lu, node_name, opts,
504 instance, timeouts, cbs, private)
506 self._dest_args = dest_args
509 self._ts_listening = None
512 def listen_port(self):
513 """Returns the port the daemon is listening on.
517 return self._daemon.listen_port
521 def _StartDaemon(self):
522 """Starts the import daemon.
525 return self._lu.rpc.call_import_start(self.node_name, self._opts,
527 self._dest, self._dest_args)
529 def CheckListening(self):
530 """Checks whether the daemon is listening.
533 @return: Whether the daemon is listening
536 assert self._daemon, "Daemon status missing"
538 if self._ts_listening is not None:
541 port = self._daemon.listen_port
543 self._ts_listening = time.time()
545 logging.debug("Import %r on %s is now listening on port %s",
546 self._daemon_name, self.node_name, port)
548 self._cbs.ReportListening(self, self._private)
552 if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
553 raise _ImportExportError("Not listening after %s seconds" %
554 self._timeouts.listen)
558 def _GetConnectedCheckEpoch(self):
559 """Returns the time since we started listening.
562 assert self._ts_listening is not None, \
563 ("Checking whether an import is connected is only useful"
564 " once it's been listening")
566 return self._ts_listening
569 class DiskExport(_DiskImportExportBase):
572 def __init__(self, lu, node_name, opts,
573 dest_host, dest_port, instance, source, source_args,
574 timeouts, cbs, private=None):
575 """Initializes this class.
577 @param lu: Logical unit instance
578 @type node_name: string
579 @param node_name: Node name for import
580 @type opts: L{objects.ImportExportOptions}
581 @param opts: Import/export daemon options
582 @type dest_host: string
583 @param dest_host: Destination host name or IP address
584 @type dest_port: number
585 @param dest_port: Destination port number
586 @type instance: L{objects.Instance}
587 @param instance: Instance object
588 @param source: I/O source
589 @param source_args: I/O source
590 @type timeouts: L{ImportExportTimeouts}
591 @param timeouts: Timeouts for this import
592 @type cbs: L{ImportExportCbBase}
593 @param cbs: Callbacks
594 @param private: Private data for callback functions
597 _DiskImportExportBase.__init__(self, lu, node_name, opts,
598 instance, timeouts, cbs, private)
599 self._dest_host = dest_host
600 self._dest_port = dest_port
601 self._source = source
602 self._source_args = source_args
604 def _StartDaemon(self):
605 """Starts the export daemon.
608 return self._lu.rpc.call_export_start(self.node_name, self._opts,
609 self._dest_host, self._dest_port,
610 self._instance, self._source,
613 def CheckListening(self):
614 """Checks whether the daemon is listening.
617 # Only an import can be listening
620 def _GetConnectedCheckEpoch(self):
621 """Returns the time since the daemon started.
624 assert self._ts_begin is not None
626 return self._ts_begin
629 def FormatProgress(progress):
630 """Formats progress information for user consumption
633 (mbytes, throughput, percent, eta) = progress
636 utils.FormatUnit(mbytes, "h"),
638 # Not using FormatUnit as it doesn't support kilobytes
639 "%0.1f MiB/s" % throughput,
642 if percent is not None:
643 parts.append("%d%%" % percent)
646 parts.append("ETA %s" % utils.FormatSeconds(eta))
648 return utils.CommaJoin(parts)
651 class ImportExportLoop:
655 def __init__(self, lu):
656 """Initializes this class.
661 self._pending_add = []
663 def Add(self, diskie):
664 """Adds an import/export object to the loop.
666 @type diskie: Subclass of L{_DiskImportExportBase}
667 @param diskie: Import/export object
670 assert diskie not in self._pending_add
671 assert diskie.loop is None
675 # Adding new objects to a staging list is necessary, otherwise the main
676 # loop gets confused if callbacks modify the queue while the main loop is
678 self._pending_add.append(diskie)
681 def _CollectDaemonStatus(lu, daemons):
682 """Collects the status for all import/export daemons.
687 for node_name, names in daemons.iteritems():
688 result = lu.rpc.call_impexp_status(node_name, names)
690 lu.LogWarning("Failed to get daemon status on %s: %s",
691 node_name, result.fail_msg)
694 assert len(names) == len(result.payload)
696 daemon_status[node_name] = dict(zip(names, result.payload))
701 def _GetActiveDaemonNames(queue):
702 """Gets the names of all active daemons.
707 if not diskie.active:
711 # Start daemon if necessary
712 daemon_name = diskie.CheckDaemon()
713 except _ImportExportError, err:
714 logging.exception("%s failed", diskie.MODE_TEXT)
715 diskie.Finalize(error=str(err))
718 result.setdefault(diskie.node_name, []).append(daemon_name)
720 assert len(queue) >= len(result)
721 assert len(queue) >= sum([len(names) for names in result.itervalues()])
723 logging.debug("daemons=%r", result)
727 def _AddPendingToQueue(self):
728 """Adds all pending import/export objects to the internal queue.
731 assert compat.all(diskie not in self._queue and diskie.loop == self
732 for diskie in self._pending_add)
734 self._queue.extend(self._pending_add)
736 del self._pending_add[:]
739 """Utility main loop.
743 self._AddPendingToQueue()
745 # Collect all active daemon names
746 daemons = self._GetActiveDaemonNames(self._queue)
750 # Collection daemon status data
751 data = self._CollectDaemonStatus(self._lu, daemons)
754 delay = self.MAX_DELAY
755 for diskie in self._queue:
756 if not diskie.active:
761 all_daemon_data = data[diskie.node_name]
763 result = diskie.SetDaemonData(False, None)
766 diskie.SetDaemonData(True,
767 all_daemon_data[diskie.GetDaemonName()])
770 # Daemon not yet ready, retry soon
771 delay = min(3.0, delay)
774 if diskie.CheckFinished():
779 # Normal case: check again in 5 seconds
780 delay = min(5.0, delay)
782 if not diskie.CheckListening():
783 # Not yet listening, retry soon
784 delay = min(1.0, delay)
787 if not diskie.CheckConnected():
788 # Not yet connected, retry soon
789 delay = min(1.0, delay)
792 except _ImportExportError, err:
793 logging.exception("%s failed", diskie.MODE_TEXT)
794 diskie.Finalize(error=str(err))
796 if not compat.any([diskie.active for diskie in self._queue]):
800 delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
801 logging.debug("Waiting for %ss", delay)
804 def FinalizeAll(self):
805 """Finalizes all pending transfers.
810 for diskie in self._queue:
811 success = diskie.Finalize() and success
816 class _TransferInstCbBase(ImportExportCbBase):
817 def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
819 """Initializes this class.
822 ImportExportCbBase.__init__(self)
825 self.feedback_fn = feedback_fn
826 self.instance = instance
827 self.timeouts = timeouts
828 self.src_node = src_node
829 self.src_cbs = src_cbs
830 self.dest_node = dest_node
831 self.dest_ip = dest_ip
834 class _TransferInstSourceCb(_TransferInstCbBase):
835 def ReportConnected(self, ie, dtp):
836 """Called when a connection has been established.
839 assert self.src_cbs is None
840 assert dtp.src_export == ie
841 assert dtp.dest_import
843 self.feedback_fn("%s is sending data on %s" %
844 (dtp.data.name, ie.node_name))
846 def ReportProgress(self, ie, dtp):
847 """Called when new progress information should be reported.
850 progress = ie.progress
854 self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
856 def ReportFinished(self, ie, dtp):
857 """Called when a transfer has finished.
860 assert self.src_cbs is None
861 assert dtp.src_export == ie
862 assert dtp.dest_import
865 self.feedback_fn("%s finished sending data" % dtp.data.name)
867 self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
868 (dtp.data.name, ie.final_message, ie.recent_output))
870 dtp.RecordResult(ie.success)
872 cb = dtp.data.finished_fn
876 # TODO: Check whether sending SIGTERM right away is okay, maybe we should
877 # give the daemon a moment to sort things out
878 if dtp.dest_import and not ie.success:
879 dtp.dest_import.Abort()
882 class _TransferInstDestCb(_TransferInstCbBase):
883 def ReportListening(self, ie, dtp):
884 """Called when daemon started listening.
888 assert dtp.src_export is None
889 assert dtp.dest_import
890 assert dtp.export_opts
892 self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
894 # Start export on source node
895 de = DiskExport(self.lu, self.src_node, dtp.export_opts,
896 self.dest_ip, ie.listen_port,
897 self.instance, dtp.data.src_io, dtp.data.src_ioargs,
898 self.timeouts, self.src_cbs, private=dtp)
903 def ReportConnected(self, ie, dtp):
904 """Called when a connection has been established.
907 self.feedback_fn("%s is receiving data on %s" %
908 (dtp.data.name, self.dest_node))
910 def ReportFinished(self, ie, dtp):
911 """Called when a transfer has finished.
915 self.feedback_fn("%s finished receiving data" % dtp.data.name)
917 self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
918 (dtp.data.name, ie.final_message, ie.recent_output))
920 dtp.RecordResult(ie.success)
922 # TODO: Check whether sending SIGTERM right away is okay, maybe we should
923 # give the daemon a moment to sort things out
924 if dtp.src_export and not ie.success:
925 dtp.src_export.Abort()
928 class DiskTransfer(object):
929 def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
931 """Initializes this class.
934 @param name: User-visible name for this transfer (e.g. "disk/0")
935 @param src_io: Source I/O type
936 @param src_ioargs: Source I/O arguments
937 @param dest_io: Destination I/O type
938 @param dest_ioargs: Destination I/O arguments
939 @type finished_fn: callable
940 @param finished_fn: Function called once transfer has finished
946 self.src_ioargs = src_ioargs
948 self.dest_io = dest_io
949 self.dest_ioargs = dest_ioargs
951 self.finished_fn = finished_fn
954 class _DiskTransferPrivate(object):
955 def __init__(self, data, success, export_opts):
956 """Initializes this class.
958 @type data: L{DiskTransfer}
963 self.success = success
964 self.export_opts = export_opts
966 self.src_export = None
967 self.dest_import = None
969 def RecordResult(self, success):
970 """Updates the status.
972 One failed part will cause the whole transfer to fail.
975 self.success = self.success and success
978 def _GetInstDiskMagic(base, instance_name, index):
979 """Computes the magic value for a disk export or import.
982 @param base: Random seed value (can be the same for all disks of a transfer)
983 @type instance_name: string
984 @param instance_name: Name of instance
986 @param index: Disk index
989 h = compat.sha1_hash()
990 h.update(str(constants.RIE_VERSION))
992 h.update(instance_name)
997 def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
998 instance, all_transfers):
999 """Transfers an instance's data from one node to another.
1001 @param lu: Logical unit instance
1002 @param feedback_fn: Feedback function
1003 @type src_node: string
1004 @param src_node: Source node name
1005 @type dest_node: string
1006 @param dest_node: Destination node name
1007 @type dest_ip: string
1008 @param dest_ip: IP address of destination node
1009 @type instance: L{objects.Instance}
1010 @param instance: Instance object
1011 @type all_transfers: list of L{DiskTransfer} instances
1012 @param all_transfers: List of all disk transfers to be made
1014 @return: List with a boolean (True=successful, False=failed) for success for
1018 # Disable compression for all moves as these are all within the same cluster
1019 compress = constants.IEC_NONE
1021 logging.debug("Source node %s, destination node %s, compression '%s'",
1022 src_node, dest_node, compress)
1024 timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1025 src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1026 src_node, None, dest_node, dest_ip)
1027 dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1028 src_node, src_cbs, dest_node, dest_ip)
1032 base_magic = utils.GenerateSecret(6)
1034 ieloop = ImportExportLoop(lu)
1036 for idx, transfer in enumerate(all_transfers):
1038 feedback_fn("Exporting %s from %s to %s" %
1039 (transfer.name, src_node, dest_node))
1041 magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1042 opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1043 compress=compress, magic=magic)
1045 dtp = _DiskTransferPrivate(transfer, True, opts)
1047 di = DiskImport(lu, dest_node, opts, instance,
1048 transfer.dest_io, transfer.dest_ioargs,
1049 timeouts, dest_cbs, private=dtp)
1052 dtp.dest_import = di
1054 dtp = _DiskTransferPrivate(None, False)
1060 ieloop.FinalizeAll()
1062 assert len(all_dtp) == len(all_transfers)
1063 assert compat.all([(dtp.src_export is None or
1064 dtp.src_export.success is not None) and
1065 (dtp.dest_import is None or
1066 dtp.dest_import.success is not None)
1067 for dtp in all_dtp]), \
1068 "Not all imports/exports are finalized"
1070 return [bool(dtp.success) for dtp in all_dtp]
1073 class _RemoteExportCb(ImportExportCbBase):
1074 def __init__(self, feedback_fn, disk_count):
1075 """Initializes this class.
1078 ImportExportCbBase.__init__(self)
1079 self._feedback_fn = feedback_fn
1080 self._dresults = [None] * disk_count
1083 def disk_results(self):
1084 """Returns per-disk results.
1087 return self._dresults
1089 def ReportConnected(self, ie, private):
1090 """Called when a connection has been established.
1095 self._feedback_fn("Disk %s is now sending data" % idx)
1097 def ReportProgress(self, ie, private):
1098 """Called when new progress information should be reported.
1103 progress = ie.progress
1107 self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1109 def ReportFinished(self, ie, private):
1110 """Called when a transfer has finished.
1113 (idx, finished_fn) = private
1116 self._feedback_fn("Disk %s finished sending data" % idx)
1118 self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" %
1119 (idx, ie.final_message, ie.recent_output))
1121 self._dresults[idx] = bool(ie.success)
1127 class ExportInstanceHelper:
1128 def __init__(self, lu, feedback_fn, instance):
1129 """Initializes this class.
1131 @param lu: Logical unit instance
1132 @param feedback_fn: Feedback function
1133 @type instance: L{objects.Instance}
1134 @param instance: Instance object
1138 self._feedback_fn = feedback_fn
1139 self._instance = instance
1141 self._snap_disks = []
1142 self._removed_snaps = [False] * len(instance.disks)
1144 def CreateSnapshots(self):
1145 """Creates an LVM snapshot for every disk of the instance.
1148 assert not self._snap_disks
1150 instance = self._instance
1151 src_node = instance.primary_node
1153 vgname = self._lu.cfg.GetVGName()
1155 for idx, disk in enumerate(instance.disks):
1156 self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1159 # result.payload will be a snapshot of an lvm leaf of the one we
1161 result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1162 msg = result.fail_msg
1164 self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1168 disk_id = (vgname, result.payload)
1169 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1170 logical_id=disk_id, physical_id=disk_id,
1171 iv_name=disk.iv_name)
1173 self._snap_disks.append(new_dev)
1175 assert len(self._snap_disks) == len(instance.disks)
1176 assert len(self._removed_snaps) == len(instance.disks)
1178 def _RemoveSnapshot(self, disk_index):
1179 """Removes an LVM snapshot.
1181 @type disk_index: number
1182 @param disk_index: Index of the snapshot to be removed
1185 disk = self._snap_disks[disk_index]
1186 if disk and not self._removed_snaps[disk_index]:
1187 src_node = self._instance.primary_node
1189 self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1190 (disk_index, src_node))
1192 result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1194 self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1195 " %s: %s", disk_index, src_node, result.fail_msg)
1197 self._removed_snaps[disk_index] = True
1199 def LocalExport(self, dest_node):
1200 """Intra-cluster instance export.
1202 @type dest_node: L{objects.Node}
1203 @param dest_node: Destination node
1206 instance = self._instance
1207 src_node = instance.primary_node
1209 assert len(self._snap_disks) == len(instance.disks)
1213 for idx, dev in enumerate(self._snap_disks):
1215 transfers.append(None)
1218 path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1221 finished_fn = compat.partial(self._TransferFinished, idx)
1223 # FIXME: pass debug option from opcode to backend
1224 dt = DiskTransfer("snapshot/%s" % idx,
1225 constants.IEIO_SCRIPT, (dev, idx),
1226 constants.IEIO_FILE, (path, ),
1228 transfers.append(dt)
1230 # Actually export data
1231 dresults = TransferInstanceData(self._lu, self._feedback_fn,
1232 src_node, dest_node.name,
1233 dest_node.secondary_ip,
1234 instance, transfers)
1236 assert len(dresults) == len(instance.disks)
1238 self._feedback_fn("Finalizing export on %s" % dest_node.name)
1239 result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1241 msg = result.fail_msg
1244 self._lu.LogWarning("Could not finalize export for instance %s"
1245 " on node %s: %s", instance.name, dest_node.name, msg)
1247 return (fin_resu, dresults)
1249 def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1250 """Inter-cluster instance export.
1252 @type disk_info: list
1253 @param disk_info: Per-disk destination information
1254 @type key_name: string
1255 @param key_name: Name of X509 key to use
1256 @type dest_ca_pem: string
1257 @param dest_ca_pem: Destination X509 CA in PEM format
1258 @type timeouts: L{ImportExportTimeouts}
1259 @param timeouts: Timeouts for this import
1262 instance = self._instance
1264 assert len(disk_info) == len(instance.disks)
1266 cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1268 ieloop = ImportExportLoop(self._lu)
1270 for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1272 opts = objects.ImportExportOptions(key_name=key_name,
1276 self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1277 finished_fn = compat.partial(self._TransferFinished, idx)
1278 ieloop.Add(DiskExport(self._lu, instance.primary_node,
1279 opts, host, port, instance,
1280 constants.IEIO_SCRIPT, (dev, idx),
1281 timeouts, cbs, private=(idx, finished_fn)))
1285 ieloop.FinalizeAll()
1287 return (True, cbs.disk_results)
1289 def _TransferFinished(self, idx):
1290 """Called once a transfer has finished.
1293 @param idx: Disk index
1296 logging.debug("Transfer %s finished", idx)
1297 self._RemoveSnapshot(idx)
1300 """Remove all snapshots.
1303 assert len(self._removed_snaps) == len(self._instance.disks)
1304 for idx in range(len(self._instance.disks)):
1305 self._RemoveSnapshot(idx)
1308 class _RemoteImportCb(ImportExportCbBase):
1309 def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1311 """Initializes this class.
1314 @param cds: Cluster domain secret
1315 @type x509_cert_pem: string
1316 @param x509_cert_pem: CA used for signing import key
1317 @type disk_count: number
1318 @param disk_count: Number of disks
1319 @type external_address: string
1320 @param external_address: External address of destination node
1323 ImportExportCbBase.__init__(self)
1324 self._feedback_fn = feedback_fn
1326 self._x509_cert_pem = x509_cert_pem
1327 self._disk_count = disk_count
1328 self._external_address = external_address
1330 self._dresults = [None] * disk_count
1331 self._daemon_port = [None] * disk_count
1333 self._salt = utils.GenerateSecret(8)
1336 def disk_results(self):
1337 """Returns per-disk results.
1340 return self._dresults
1342 def _CheckAllListening(self):
1343 """Checks whether all daemons are listening.
1345 If all daemons are listening, the information is sent to the client.
1348 if not compat.all(dp is not None for dp in self._daemon_port):
1351 host = self._external_address
1354 for idx, (port, magic) in enumerate(self._daemon_port):
1355 disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1356 idx, host, port, magic))
1358 assert len(disks) == self._disk_count
1360 self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1362 "x509_ca": self._x509_cert_pem,
1365 def ReportListening(self, ie, private):
1366 """Called when daemon started listening.
1371 self._feedback_fn("Disk %s is now listening" % idx)
1373 assert self._daemon_port[idx] is None
1375 self._daemon_port[idx] = (ie.listen_port, ie.magic)
1377 self._CheckAllListening()
1379 def ReportConnected(self, ie, private):
1380 """Called when a connection has been established.
1385 self._feedback_fn("Disk %s is now receiving data" % idx)
1387 def ReportFinished(self, ie, private):
1388 """Called when a transfer has finished.
1393 # Daemon is certainly no longer listening
1394 self._daemon_port[idx] = None
1397 self._feedback_fn("Disk %s finished receiving data" % idx)
1399 self._feedback_fn(("Disk %s failed to receive data: %s"
1400 " (recent output: %r)") %
1401 (idx, ie.final_message, ie.recent_output))
1403 self._dresults[idx] = bool(ie.success)
1406 def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts):
1407 """Imports an instance from another cluster.
1409 @param lu: Logical unit instance
1410 @param feedback_fn: Feedback function
1411 @type instance: L{objects.Instance}
1412 @param instance: Instance object
1413 @type source_x509_ca: OpenSSL.crypto.X509
1414 @param source_x509_ca: Import source's X509 CA
1416 @param cds: Cluster domain secret
1417 @type timeouts: L{ImportExportTimeouts}
1418 @param timeouts: Timeouts for this import
1421 source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1424 magic_base = utils.GenerateSecret(6)
1427 result = lu.rpc.call_x509_cert_create(instance.primary_node,
1428 constants.RIE_CERT_VALIDITY)
1429 result.Raise("Can't create X509 key and certificate on %s" % result.node)
1431 (x509_key_name, x509_cert_pem) = result.payload
1434 x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1438 signed_x509_cert_pem = \
1439 utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1441 cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1442 len(instance.disks), instance.primary_node)
1444 ieloop = ImportExportLoop(lu)
1446 for idx, dev in enumerate(instance.disks):
1447 magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1449 # Import daemon options
1450 opts = objects.ImportExportOptions(key_name=x509_key_name,
1451 ca_pem=source_ca_pem,
1454 ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1455 constants.IEIO_SCRIPT, (dev, idx),
1456 timeouts, cbs, private=(idx, )))
1460 ieloop.FinalizeAll()
1462 # Remove crypto key and certificate
1463 result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1464 result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1466 return cbs.disk_results
1469 def _GetImportExportHandshakeMessage(version):
1470 """Returns the handshake message for a RIE protocol version.
1472 @type version: number
1475 return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1478 def ComputeRemoteExportHandshake(cds):
1479 """Computes the remote import/export handshake.
1482 @param cds: Cluster domain secret
1485 salt = utils.GenerateSecret(8)
1486 msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1487 return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1490 def CheckRemoteExportHandshake(cds, handshake):
1491 """Checks the handshake of a remote import/export.
1494 @param cds: Cluster domain secret
1495 @type handshake: sequence
1496 @param handshake: Handshake sent by remote peer
1500 (version, hmac_digest, hmac_salt) = handshake
1501 except (TypeError, ValueError), err:
1502 return "Invalid data: %s" % err
1504 if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1505 hmac_digest, salt=hmac_salt):
1506 return "Hash didn't match, clusters don't share the same domain secret"
1508 if version != constants.RIE_VERSION:
1509 return ("Clusters don't have the same remote import/export protocol"
1510 " (local=%s, remote=%s)" %
1511 (constants.RIE_VERSION, version))
1516 def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1517 """Returns the hashed text for import/export disk information.
1519 @type disk_index: number
1520 @param disk_index: Index of disk (included in hash)
1522 @param host: Hostname
1524 @param port: Daemon port
1526 @param magic: Magic value
1529 return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1532 def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1533 """Verifies received disk information for an export.
1536 @param cds: Cluster domain secret
1537 @type disk_index: number
1538 @param disk_index: Index of disk (included in hash)
1539 @type disk_info: sequence
1540 @param disk_info: Disk information sent by remote peer
1544 (host, port, magic, hmac_digest, hmac_salt) = disk_info
1545 except (TypeError, ValueError), err:
1546 raise errors.GenericError("Invalid data: %s" % err)
1548 if not (host and port and magic):
1549 raise errors.GenericError("Missing destination host, port or magic")
1551 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1553 if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1554 raise errors.GenericError("HMAC is wrong")
1556 return (utils.HostInfo.NormalizeName(host),
1557 utils.ValidateServiceName(port),
1561 def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1562 """Computes the signed disk information for a remote import.
1565 @param cds: Cluster domain secret
1567 @param salt: HMAC salt
1568 @type disk_index: number
1569 @param disk_index: Index of disk (included in hash)
1571 @param host: Hostname
1573 @param port: Daemon port
1575 @param magic: Magic value
1578 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1579 hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1580 return (host, port, magic, hmac_digest, salt)