4 # Copyright (C) 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Instance-related functions and classes for masterd.
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import compat
33 from ganeti import utils
34 from ganeti import objects
35 from ganeti import netutils
36 from ganeti import pathutils
39 class _ImportExportError(Exception):
40 """Local exception to report import/export errors.
45 class ImportExportTimeouts(object):
46 #: Time until daemon starts writing status file
47 DEFAULT_READY_TIMEOUT = 10
49 #: Length of time until errors cause hard failure
50 DEFAULT_ERROR_TIMEOUT = 10
52 #: Time after which daemon must be listening
53 DEFAULT_LISTEN_TIMEOUT = 10
55 #: Progress update interval
56 DEFAULT_PROGRESS_INTERVAL = 60
66 def __init__(self, connect,
67 listen=DEFAULT_LISTEN_TIMEOUT,
68 error=DEFAULT_ERROR_TIMEOUT,
69 ready=DEFAULT_READY_TIMEOUT,
70 progress=DEFAULT_PROGRESS_INTERVAL):
71 """Initializes this class.
74 @param connect: Timeout for establishing connection
76 @param listen: Timeout for starting to listen for connections
78 @param error: Length of time until errors cause hard failure
80 @param ready: Timeout for daemon to become ready
81 @type progress: number
82 @param progress: Progress update interval
88 self.connect = connect
89 self.progress = progress
92 class ImportExportCbBase(object):
93 """Callbacks for disk import/export.
96 def ReportListening(self, ie, private, component):
97 """Called when daemon started listening.
99 @type ie: Subclass of L{_DiskImportExportBase}
100 @param ie: Import/export object
101 @param private: Private data passed to import/export object
102 @param component: transfer component name
106 def ReportConnected(self, ie, private):
107 """Called when a connection has been established.
109 @type ie: Subclass of L{_DiskImportExportBase}
110 @param ie: Import/export object
111 @param private: Private data passed to import/export object
115 def ReportProgress(self, ie, private):
116 """Called when new progress information should be reported.
118 @type ie: Subclass of L{_DiskImportExportBase}
119 @param ie: Import/export object
120 @param private: Private data passed to import/export object
124 def ReportFinished(self, ie, private):
125 """Called when a transfer has finished.
127 @type ie: Subclass of L{_DiskImportExportBase}
128 @param ie: Import/export object
129 @param private: Private data passed to import/export object
134 class _DiskImportExportBase(object):
137 def __init__(self, lu, node_name, opts,
138 instance, component, timeouts, cbs, private=None):
139 """Initializes this class.
141 @param lu: Logical unit instance
142 @type node_name: string
143 @param node_name: Node name for import
144 @type opts: L{objects.ImportExportOptions}
145 @param opts: Import/export daemon options
146 @type instance: L{objects.Instance}
147 @param instance: Instance object
148 @type component: string
149 @param component: which part of the instance is being imported
150 @type timeouts: L{ImportExportTimeouts}
151 @param timeouts: Timeouts for this import
152 @type cbs: L{ImportExportCbBase}
153 @param cbs: Callbacks
154 @param private: Private data for callback functions
157 assert self.MODE_TEXT
160 self.node_name = node_name
161 self._opts = opts.Copy()
162 self._instance = instance
163 self._component = component
164 self._timeouts = timeouts
166 self._private = private
168 # Set master daemon's timeout in options for import/export daemon
169 assert self._opts.connect_timeout is None
170 self._opts.connect_timeout = timeouts.connect
176 self._ts_begin = None
177 self._ts_connected = None
178 self._ts_finished = None
179 self._ts_cleanup = None
180 self._ts_last_progress = None
181 self._ts_last_error = None
185 self.final_message = None
188 self._daemon_name = None
192 def recent_output(self):
193 """Returns the most recent output from the daemon.
197 return "\n".join(self._daemon.recent_output)
203 """Returns transfer progress information.
209 return (self._daemon.progress_mbytes,
210 self._daemon.progress_throughput,
211 self._daemon.progress_percent,
212 self._daemon.progress_eta)
216 """Returns the magic value for this import/export.
219 return self._opts.magic
223 """Determines whether this transport is still active.
226 return self.success is None
230 """Returns parent loop.
232 @rtype: L{ImportExportLoop}
237 def SetLoop(self, loop):
238 """Sets the parent loop.
240 @type loop: L{ImportExportLoop}
244 raise errors.ProgrammerError("Loop can only be set once")
248 def _StartDaemon(self):
249 """Starts the import/export daemon.
252 raise NotImplementedError()
254 def CheckDaemon(self):
255 """Checks whether daemon has been started and if not, starts it.
261 assert self._ts_cleanup is None
263 if self._daemon_name is None:
264 assert self._ts_begin is None
266 result = self._StartDaemon()
268 raise _ImportExportError("Failed to start %s on %s: %s" %
269 (self.MODE_TEXT, self.node_name,
272 daemon_name = result.payload
274 logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
277 self._ts_begin = time.time()
278 self._daemon_name = daemon_name
280 return self._daemon_name
282 def GetDaemonName(self):
283 """Returns the daemon name.
286 assert self._daemon_name, "Daemon has not been started"
287 assert self._ts_cleanup is None
288 return self._daemon_name
291 """Sends SIGTERM to import/export daemon (if still active).
294 if self._daemon_name:
295 self._lu.LogWarning("Aborting %s '%s' on %s",
296 self.MODE_TEXT, self._daemon_name, self.node_name)
297 result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
299 self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
300 self.MODE_TEXT, self._daemon_name,
301 self.node_name, result.fail_msg)
306 def _SetDaemonData(self, data):
307 """Internal function for updating status daemon data.
309 @type data: L{objects.ImportExportStatus}
310 @param data: Daemon status data
313 assert self._ts_begin is not None
316 if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready):
317 raise _ImportExportError("Didn't become ready after %s seconds" %
318 self._timeouts.ready)
326 def SetDaemonData(self, success, data):
327 """Updates daemon status data.
330 @param success: Whether fetching data was successful or not
331 @type data: L{objects.ImportExportStatus}
332 @param data: Daemon status data
336 if self._ts_last_error is None:
337 self._ts_last_error = time.time()
339 elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error):
340 raise _ImportExportError("Too many errors while updating data")
344 self._ts_last_error = None
346 return self._SetDaemonData(data)
348 def CheckListening(self):
349 """Checks whether the daemon is listening.
352 raise NotImplementedError()
354 def _GetConnectedCheckEpoch(self):
355 """Returns timeout to calculate connect timeout.
358 raise NotImplementedError()
360 def CheckConnected(self):
361 """Checks whether the daemon is connected.
364 @return: Whether the daemon is connected
367 assert self._daemon, "Daemon status missing"
369 if self._ts_connected is not None:
372 if self._daemon.connected:
373 self._ts_connected = time.time()
375 # TODO: Log remote peer
376 logging.debug("%s '%s' on %s is now connected",
377 self.MODE_TEXT, self._daemon_name, self.node_name)
379 self._cbs.ReportConnected(self, self._private)
383 if utils.TimeoutExpired(self._GetConnectedCheckEpoch(),
384 self._timeouts.connect):
385 raise _ImportExportError("Not connected after %s seconds" %
386 self._timeouts.connect)
390 def _CheckProgress(self):
391 """Checks whether a progress update should be reported.
394 if ((self._ts_last_progress is None or
395 utils.TimeoutExpired(self._ts_last_progress,
396 self._timeouts.progress)) and
398 self._daemon.progress_mbytes is not None and
399 self._daemon.progress_throughput is not None):
400 self._cbs.ReportProgress(self, self._private)
401 self._ts_last_progress = time.time()
403 def CheckFinished(self):
404 """Checks whether the daemon exited.
407 @return: Whether the transfer is finished
410 assert self._daemon, "Daemon status missing"
412 if self._ts_finished:
415 if self._daemon.exit_status is None:
416 # TODO: Adjust delay for ETA expiring soon
417 self._CheckProgress()
420 self._ts_finished = time.time()
422 self._ReportFinished(self._daemon.exit_status == 0,
423 self._daemon.error_message)
427 def _ReportFinished(self, success, message):
428 """Transfer is finished or daemon exited.
431 @param success: Whether the transfer was successful
432 @type message: string
433 @param message: Error message
436 assert self.success is None
438 self.success = success
439 self.final_message = message
442 logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
443 self._daemon_name, self.node_name)
444 elif self._daemon_name:
445 self._lu.LogWarning("%s '%s' on %s failed: %s",
446 self.MODE_TEXT, self._daemon_name, self.node_name,
449 self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
450 self.node_name, message)
452 self._cbs.ReportFinished(self, self._private)
455 """Makes the RPC call to finalize this import/export.
458 return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
460 def Finalize(self, error=None):
461 """Finalizes this import/export.
464 if self._daemon_name:
465 logging.info("Finalizing %s '%s' on %s",
466 self.MODE_TEXT, self._daemon_name, self.node_name)
468 result = self._Finalize()
470 self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
471 self.MODE_TEXT, self._daemon_name,
472 self.node_name, result.fail_msg)
475 # Daemon is no longer running
476 self._daemon_name = None
477 self._ts_cleanup = time.time()
480 self._ReportFinished(False, error)
485 class DiskImport(_DiskImportExportBase):
488 def __init__(self, lu, node_name, opts, instance, component,
489 dest, dest_args, timeouts, cbs, private=None):
490 """Initializes this class.
492 @param lu: Logical unit instance
493 @type node_name: string
494 @param node_name: Node name for import
495 @type opts: L{objects.ImportExportOptions}
496 @param opts: Import/export daemon options
497 @type instance: L{objects.Instance}
498 @param instance: Instance object
499 @type component: string
500 @param component: which part of the instance is being imported
501 @param dest: I/O destination
502 @param dest_args: I/O arguments
503 @type timeouts: L{ImportExportTimeouts}
504 @param timeouts: Timeouts for this import
505 @type cbs: L{ImportExportCbBase}
506 @param cbs: Callbacks
507 @param private: Private data for callback functions
510 _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
511 component, timeouts, cbs, private)
513 self._dest_args = dest_args
516 self._ts_listening = None
519 def listen_port(self):
520 """Returns the port the daemon is listening on.
524 return self._daemon.listen_port
528 def _StartDaemon(self):
529 """Starts the import daemon.
532 return self._lu.rpc.call_import_start(self.node_name, self._opts,
533 self._instance, self._component,
534 (self._dest, self._dest_args))
536 def CheckListening(self):
537 """Checks whether the daemon is listening.
540 @return: Whether the daemon is listening
543 assert self._daemon, "Daemon status missing"
545 if self._ts_listening is not None:
548 port = self._daemon.listen_port
550 self._ts_listening = time.time()
552 logging.debug("Import '%s' on %s is now listening on port %s",
553 self._daemon_name, self.node_name, port)
555 self._cbs.ReportListening(self, self._private, self._component)
559 if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen):
560 raise _ImportExportError("Not listening after %s seconds" %
561 self._timeouts.listen)
565 def _GetConnectedCheckEpoch(self):
566 """Returns the time since we started listening.
569 assert self._ts_listening is not None, \
570 ("Checking whether an import is connected is only useful"
571 " once it's been listening")
573 return self._ts_listening
576 class DiskExport(_DiskImportExportBase):
579 def __init__(self, lu, node_name, opts, dest_host, dest_port,
580 instance, component, source, source_args,
581 timeouts, cbs, private=None):
582 """Initializes this class.
584 @param lu: Logical unit instance
585 @type node_name: string
586 @param node_name: Node name for import
587 @type opts: L{objects.ImportExportOptions}
588 @param opts: Import/export daemon options
589 @type dest_host: string
590 @param dest_host: Destination host name or IP address
591 @type dest_port: number
592 @param dest_port: Destination port number
593 @type instance: L{objects.Instance}
594 @param instance: Instance object
595 @type component: string
596 @param component: which part of the instance is being imported
597 @param source: I/O source
598 @param source_args: I/O source
599 @type timeouts: L{ImportExportTimeouts}
600 @param timeouts: Timeouts for this import
601 @type cbs: L{ImportExportCbBase}
602 @param cbs: Callbacks
603 @param private: Private data for callback functions
606 _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
607 component, timeouts, cbs, private)
608 self._dest_host = dest_host
609 self._dest_port = dest_port
610 self._source = source
611 self._source_args = source_args
613 def _StartDaemon(self):
614 """Starts the export daemon.
617 return self._lu.rpc.call_export_start(self.node_name, self._opts,
618 self._dest_host, self._dest_port,
619 self._instance, self._component,
620 (self._source, self._source_args))
622 def CheckListening(self):
623 """Checks whether the daemon is listening.
626 # Only an import can be listening
629 def _GetConnectedCheckEpoch(self):
630 """Returns the time since the daemon started.
633 assert self._ts_begin is not None
635 return self._ts_begin
638 def FormatProgress(progress):
639 """Formats progress information for user consumption
642 (mbytes, throughput, percent, eta) = progress
645 utils.FormatUnit(mbytes, "h"),
647 # Not using FormatUnit as it doesn't support kilobytes
648 "%0.1f MiB/s" % throughput,
651 if percent is not None:
652 parts.append("%d%%" % percent)
655 parts.append("ETA %s" % utils.FormatSeconds(eta))
657 return utils.CommaJoin(parts)
660 class ImportExportLoop:
664 def __init__(self, lu):
665 """Initializes this class.
670 self._pending_add = []
672 def Add(self, diskie):
673 """Adds an import/export object to the loop.
675 @type diskie: Subclass of L{_DiskImportExportBase}
676 @param diskie: Import/export object
679 assert diskie not in self._pending_add
680 assert diskie.loop is None
684 # Adding new objects to a staging list is necessary, otherwise the main
685 # loop gets confused if callbacks modify the queue while the main loop is
687 self._pending_add.append(diskie)
690 def _CollectDaemonStatus(lu, daemons):
691 """Collects the status for all import/export daemons.
696 for node_name, names in daemons.iteritems():
697 result = lu.rpc.call_impexp_status(node_name, names)
699 lu.LogWarning("Failed to get daemon status on %s: %s",
700 node_name, result.fail_msg)
703 assert len(names) == len(result.payload)
705 daemon_status[node_name] = dict(zip(names, result.payload))
710 def _GetActiveDaemonNames(queue):
711 """Gets the names of all active daemons.
716 if not diskie.active:
720 # Start daemon if necessary
721 daemon_name = diskie.CheckDaemon()
722 except _ImportExportError, err:
723 logging.exception("%s failed", diskie.MODE_TEXT)
724 diskie.Finalize(error=str(err))
727 result.setdefault(diskie.node_name, []).append(daemon_name)
729 assert len(queue) >= len(result)
730 assert len(queue) >= sum([len(names) for names in result.itervalues()])
732 logging.debug("daemons=%r", result)
736 def _AddPendingToQueue(self):
737 """Adds all pending import/export objects to the internal queue.
740 assert compat.all(diskie not in self._queue and diskie.loop == self
741 for diskie in self._pending_add)
743 self._queue.extend(self._pending_add)
745 del self._pending_add[:]
748 """Utility main loop.
752 self._AddPendingToQueue()
754 # Collect all active daemon names
755 daemons = self._GetActiveDaemonNames(self._queue)
759 # Collection daemon status data
760 data = self._CollectDaemonStatus(self._lu, daemons)
763 delay = self.MAX_DELAY
764 for diskie in self._queue:
765 if not diskie.active:
770 all_daemon_data = data[diskie.node_name]
772 result = diskie.SetDaemonData(False, None)
775 diskie.SetDaemonData(True,
776 all_daemon_data[diskie.GetDaemonName()])
779 # Daemon not yet ready, retry soon
780 delay = min(3.0, delay)
783 if diskie.CheckFinished():
788 # Normal case: check again in 5 seconds
789 delay = min(5.0, delay)
791 if not diskie.CheckListening():
792 # Not yet listening, retry soon
793 delay = min(1.0, delay)
796 if not diskie.CheckConnected():
797 # Not yet connected, retry soon
798 delay = min(1.0, delay)
801 except _ImportExportError, err:
802 logging.exception("%s failed", diskie.MODE_TEXT)
803 diskie.Finalize(error=str(err))
805 if not compat.any(diskie.active for diskie in self._queue):
809 delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
810 logging.debug("Waiting for %ss", delay)
813 def FinalizeAll(self):
814 """Finalizes all pending transfers.
819 for diskie in self._queue:
820 success = diskie.Finalize() and success
825 class _TransferInstCbBase(ImportExportCbBase):
826 def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
828 """Initializes this class.
831 ImportExportCbBase.__init__(self)
834 self.feedback_fn = feedback_fn
835 self.instance = instance
836 self.timeouts = timeouts
837 self.src_node = src_node
838 self.src_cbs = src_cbs
839 self.dest_node = dest_node
840 self.dest_ip = dest_ip
843 class _TransferInstSourceCb(_TransferInstCbBase):
844 def ReportConnected(self, ie, dtp):
845 """Called when a connection has been established.
848 assert self.src_cbs is None
849 assert dtp.src_export == ie
850 assert dtp.dest_import
852 self.feedback_fn("%s is sending data on %s" %
853 (dtp.data.name, ie.node_name))
855 def ReportProgress(self, ie, dtp):
856 """Called when new progress information should be reported.
859 progress = ie.progress
863 self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
865 def ReportFinished(self, ie, dtp):
866 """Called when a transfer has finished.
869 assert self.src_cbs is None
870 assert dtp.src_export == ie
871 assert dtp.dest_import
874 self.feedback_fn("%s finished sending data" % dtp.data.name)
876 self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
877 (dtp.data.name, ie.final_message, ie.recent_output))
879 dtp.RecordResult(ie.success)
881 cb = dtp.data.finished_fn
885 # TODO: Check whether sending SIGTERM right away is okay, maybe we should
886 # give the daemon a moment to sort things out
887 if dtp.dest_import and not ie.success:
888 dtp.dest_import.Abort()
891 class _TransferInstDestCb(_TransferInstCbBase):
892 def ReportListening(self, ie, dtp, component):
893 """Called when daemon started listening.
897 assert dtp.src_export is None
898 assert dtp.dest_import
899 assert dtp.export_opts
901 self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
903 # Start export on source node
904 de = DiskExport(self.lu, self.src_node, dtp.export_opts,
905 self.dest_ip, ie.listen_port, self.instance,
906 component, dtp.data.src_io, dtp.data.src_ioargs,
907 self.timeouts, self.src_cbs, private=dtp)
912 def ReportConnected(self, ie, dtp):
913 """Called when a connection has been established.
916 self.feedback_fn("%s is receiving data on %s" %
917 (dtp.data.name, self.dest_node))
919 def ReportFinished(self, ie, dtp):
920 """Called when a transfer has finished.
924 self.feedback_fn("%s finished receiving data" % dtp.data.name)
926 self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
927 (dtp.data.name, ie.final_message, ie.recent_output))
929 dtp.RecordResult(ie.success)
931 # TODO: Check whether sending SIGTERM right away is okay, maybe we should
932 # give the daemon a moment to sort things out
933 if dtp.src_export and not ie.success:
934 dtp.src_export.Abort()
937 class DiskTransfer(object):
938 def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
940 """Initializes this class.
943 @param name: User-visible name for this transfer (e.g. "disk/0")
944 @param src_io: Source I/O type
945 @param src_ioargs: Source I/O arguments
946 @param dest_io: Destination I/O type
947 @param dest_ioargs: Destination I/O arguments
948 @type finished_fn: callable
949 @param finished_fn: Function called once transfer has finished
955 self.src_ioargs = src_ioargs
957 self.dest_io = dest_io
958 self.dest_ioargs = dest_ioargs
960 self.finished_fn = finished_fn
963 class _DiskTransferPrivate(object):
964 def __init__(self, data, success, export_opts):
965 """Initializes this class.
967 @type data: L{DiskTransfer}
972 self.success = success
973 self.export_opts = export_opts
975 self.src_export = None
976 self.dest_import = None
978 def RecordResult(self, success):
979 """Updates the status.
981 One failed part will cause the whole transfer to fail.
984 self.success = self.success and success
987 def _GetInstDiskMagic(base, instance_name, index):
988 """Computes the magic value for a disk export or import.
991 @param base: Random seed value (can be the same for all disks of a transfer)
992 @type instance_name: string
993 @param instance_name: Name of instance
995 @param index: Disk index
998 h = compat.sha1_hash()
999 h.update(str(constants.RIE_VERSION))
1001 h.update(instance_name)
1002 h.update(str(index))
1003 return h.hexdigest()
1006 def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
1007 instance, all_transfers):
1008 """Transfers an instance's data from one node to another.
1010 @param lu: Logical unit instance
1011 @param feedback_fn: Feedback function
1012 @type src_node: string
1013 @param src_node: Source node name
1014 @type dest_node: string
1015 @param dest_node: Destination node name
1016 @type dest_ip: string
1017 @param dest_ip: IP address of destination node
1018 @type instance: L{objects.Instance}
1019 @param instance: Instance object
1020 @type all_transfers: list of L{DiskTransfer} instances
1021 @param all_transfers: List of all disk transfers to be made
1023 @return: List with a boolean (True=successful, False=failed) for success for
1027 # Disable compression for all moves as these are all within the same cluster
1028 compress = constants.IEC_NONE
1030 logging.debug("Source node %s, destination node %s, compression '%s'",
1031 src_node, dest_node, compress)
1033 timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1034 src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1035 src_node, None, dest_node, dest_ip)
1036 dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1037 src_node, src_cbs, dest_node, dest_ip)
1041 base_magic = utils.GenerateSecret(6)
1043 ieloop = ImportExportLoop(lu)
1045 for idx, transfer in enumerate(all_transfers):
1047 feedback_fn("Exporting %s from %s to %s" %
1048 (transfer.name, src_node, dest_node))
1050 magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1051 opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1052 compress=compress, magic=magic)
1054 dtp = _DiskTransferPrivate(transfer, True, opts)
1056 di = DiskImport(lu, dest_node, opts, instance, "disk%d" % idx,
1057 transfer.dest_io, transfer.dest_ioargs,
1058 timeouts, dest_cbs, private=dtp)
1061 dtp.dest_import = di
1063 dtp = _DiskTransferPrivate(None, False, None)
1069 ieloop.FinalizeAll()
1071 assert len(all_dtp) == len(all_transfers)
1072 assert compat.all((dtp.src_export is None or
1073 dtp.src_export.success is not None) and
1074 (dtp.dest_import is None or
1075 dtp.dest_import.success is not None)
1076 for dtp in all_dtp), \
1077 "Not all imports/exports are finalized"
1079 return [bool(dtp.success) for dtp in all_dtp]
1082 class _RemoteExportCb(ImportExportCbBase):
1083 def __init__(self, feedback_fn, disk_count):
1084 """Initializes this class.
1087 ImportExportCbBase.__init__(self)
1088 self._feedback_fn = feedback_fn
1089 self._dresults = [None] * disk_count
1092 def disk_results(self):
1093 """Returns per-disk results.
1096 return self._dresults
1098 def ReportConnected(self, ie, private):
1099 """Called when a connection has been established.
1104 self._feedback_fn("Disk %s is now sending data" % idx)
1106 def ReportProgress(self, ie, private):
1107 """Called when new progress information should be reported.
1112 progress = ie.progress
1116 self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1118 def ReportFinished(self, ie, private):
1119 """Called when a transfer has finished.
1122 (idx, finished_fn) = private
1125 self._feedback_fn("Disk %s finished sending data" % idx)
1127 self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1128 (idx, ie.final_message, ie.recent_output))
1130 self._dresults[idx] = bool(ie.success)
1136 class ExportInstanceHelper:
1137 def __init__(self, lu, feedback_fn, instance):
1138 """Initializes this class.
1140 @param lu: Logical unit instance
1141 @param feedback_fn: Feedback function
1142 @type instance: L{objects.Instance}
1143 @param instance: Instance object
1147 self._feedback_fn = feedback_fn
1148 self._instance = instance
1150 self._snap_disks = []
1151 self._removed_snaps = [False] * len(instance.disks)
1153 def CreateSnapshots(self):
1154 """Creates an LVM snapshot for every disk of the instance.
1157 assert not self._snap_disks
1159 instance = self._instance
1160 src_node = instance.primary_node
1162 for idx, disk in enumerate(instance.disks):
1163 self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1166 # result.payload will be a snapshot of an lvm leaf of the one we
1168 result = self._lu.rpc.call_blockdev_snapshot(src_node, (disk, instance))
1170 msg = result.fail_msg
1172 self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1174 elif (not isinstance(result.payload, (tuple, list)) or
1175 len(result.payload) != 2):
1176 self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1177 " result '%s'", idx, src_node, result.payload)
1179 disk_id = tuple(result.payload)
1180 disk_params = constants.DISK_LD_DEFAULTS[constants.LD_LV].copy()
1181 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1182 logical_id=disk_id, physical_id=disk_id,
1183 iv_name=disk.iv_name,
1186 self._snap_disks.append(new_dev)
1188 assert len(self._snap_disks) == len(instance.disks)
1189 assert len(self._removed_snaps) == len(instance.disks)
1191 def _RemoveSnapshot(self, disk_index):
1192 """Removes an LVM snapshot.
1194 @type disk_index: number
1195 @param disk_index: Index of the snapshot to be removed
1198 disk = self._snap_disks[disk_index]
1199 if disk and not self._removed_snaps[disk_index]:
1200 src_node = self._instance.primary_node
1202 self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1203 (disk_index, src_node))
1205 result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1207 self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1208 " %s: %s", disk_index, src_node, result.fail_msg)
1210 self._removed_snaps[disk_index] = True
1212 def LocalExport(self, dest_node):
1213 """Intra-cluster instance export.
1215 @type dest_node: L{objects.Node}
1216 @param dest_node: Destination node
1219 instance = self._instance
1220 src_node = instance.primary_node
1222 assert len(self._snap_disks) == len(instance.disks)
1226 for idx, dev in enumerate(self._snap_disks):
1228 transfers.append(None)
1231 path = utils.PathJoin(pathutils.EXPORT_DIR, "%s.new" % instance.name,
1234 finished_fn = compat.partial(self._TransferFinished, idx)
1236 # FIXME: pass debug option from opcode to backend
1237 dt = DiskTransfer("snapshot/%s" % idx,
1238 constants.IEIO_SCRIPT, (dev, idx),
1239 constants.IEIO_FILE, (path, ),
1241 transfers.append(dt)
1243 # Actually export data
1244 dresults = TransferInstanceData(self._lu, self._feedback_fn,
1245 src_node, dest_node.name,
1246 dest_node.secondary_ip,
1247 instance, transfers)
1249 assert len(dresults) == len(instance.disks)
1251 self._feedback_fn("Finalizing export on %s" % dest_node.name)
1252 result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1254 msg = result.fail_msg
1257 self._lu.LogWarning("Could not finalize export for instance %s"
1258 " on node %s: %s", instance.name, dest_node.name, msg)
1260 return (fin_resu, dresults)
1262 def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1263 """Inter-cluster instance export.
1265 @type disk_info: list
1266 @param disk_info: Per-disk destination information
1267 @type key_name: string
1268 @param key_name: Name of X509 key to use
1269 @type dest_ca_pem: string
1270 @param dest_ca_pem: Destination X509 CA in PEM format
1271 @type timeouts: L{ImportExportTimeouts}
1272 @param timeouts: Timeouts for this import
1275 instance = self._instance
1277 assert len(disk_info) == len(instance.disks)
1279 cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1281 ieloop = ImportExportLoop(self._lu)
1283 for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1285 # Decide whether to use IPv6
1286 ipv6 = netutils.IP6Address.IsValid(host)
1288 opts = objects.ImportExportOptions(key_name=key_name,
1290 magic=magic, ipv6=ipv6)
1292 self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1293 finished_fn = compat.partial(self._TransferFinished, idx)
1294 ieloop.Add(DiskExport(self._lu, instance.primary_node,
1295 opts, host, port, instance, "disk%d" % idx,
1296 constants.IEIO_SCRIPT, (dev, idx),
1297 timeouts, cbs, private=(idx, finished_fn)))
1301 ieloop.FinalizeAll()
1303 return (True, cbs.disk_results)
1305 def _TransferFinished(self, idx):
1306 """Called once a transfer has finished.
1309 @param idx: Disk index
1312 logging.debug("Transfer %s finished", idx)
1313 self._RemoveSnapshot(idx)
1316 """Remove all snapshots.
1319 assert len(self._removed_snaps) == len(self._instance.disks)
1320 for idx in range(len(self._instance.disks)):
1321 self._RemoveSnapshot(idx)
1324 class _RemoteImportCb(ImportExportCbBase):
1325 def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1327 """Initializes this class.
1330 @param cds: Cluster domain secret
1331 @type x509_cert_pem: string
1332 @param x509_cert_pem: CA used for signing import key
1333 @type disk_count: number
1334 @param disk_count: Number of disks
1335 @type external_address: string
1336 @param external_address: External address of destination node
1339 ImportExportCbBase.__init__(self)
1340 self._feedback_fn = feedback_fn
1342 self._x509_cert_pem = x509_cert_pem
1343 self._disk_count = disk_count
1344 self._external_address = external_address
1346 self._dresults = [None] * disk_count
1347 self._daemon_port = [None] * disk_count
1349 self._salt = utils.GenerateSecret(8)
1352 def disk_results(self):
1353 """Returns per-disk results.
1356 return self._dresults
1358 def _CheckAllListening(self):
1359 """Checks whether all daemons are listening.
1361 If all daemons are listening, the information is sent to the client.
1364 if not compat.all(dp is not None for dp in self._daemon_port):
1367 host = self._external_address
1370 for idx, (port, magic) in enumerate(self._daemon_port):
1371 disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1372 idx, host, port, magic))
1374 assert len(disks) == self._disk_count
1376 self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1378 "x509_ca": self._x509_cert_pem,
1381 def ReportListening(self, ie, private, _):
1382 """Called when daemon started listening.
1387 self._feedback_fn("Disk %s is now listening" % idx)
1389 assert self._daemon_port[idx] is None
1391 self._daemon_port[idx] = (ie.listen_port, ie.magic)
1393 self._CheckAllListening()
1395 def ReportConnected(self, ie, private):
1396 """Called when a connection has been established.
1401 self._feedback_fn("Disk %s is now receiving data" % idx)
1403 def ReportFinished(self, ie, private):
1404 """Called when a transfer has finished.
1409 # Daemon is certainly no longer listening
1410 self._daemon_port[idx] = None
1413 self._feedback_fn("Disk %s finished receiving data" % idx)
1415 self._feedback_fn(("Disk %s failed to receive data: %s"
1416 " (recent output: %s)") %
1417 (idx, ie.final_message, ie.recent_output))
1419 self._dresults[idx] = bool(ie.success)
1422 def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1424 """Imports an instance from another cluster.
1426 @param lu: Logical unit instance
1427 @param feedback_fn: Feedback function
1428 @type instance: L{objects.Instance}
1429 @param instance: Instance object
1430 @type pnode: L{objects.Node}
1431 @param pnode: Primary node of instance as an object
1432 @type source_x509_ca: OpenSSL.crypto.X509
1433 @param source_x509_ca: Import source's X509 CA
1435 @param cds: Cluster domain secret
1436 @type timeouts: L{ImportExportTimeouts}
1437 @param timeouts: Timeouts for this import
1440 source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1443 magic_base = utils.GenerateSecret(6)
1445 # Decide whether to use IPv6
1446 ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1449 result = lu.rpc.call_x509_cert_create(instance.primary_node,
1450 constants.RIE_CERT_VALIDITY)
1451 result.Raise("Can't create X509 key and certificate on %s" % result.node)
1453 (x509_key_name, x509_cert_pem) = result.payload
1456 x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1460 signed_x509_cert_pem = \
1461 utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1463 cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1464 len(instance.disks), pnode.primary_ip)
1466 ieloop = ImportExportLoop(lu)
1468 for idx, dev in enumerate(instance.disks):
1469 magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1471 # Import daemon options
1472 opts = objects.ImportExportOptions(key_name=x509_key_name,
1473 ca_pem=source_ca_pem,
1474 magic=magic, ipv6=ipv6)
1476 ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1478 constants.IEIO_SCRIPT, (dev, idx),
1479 timeouts, cbs, private=(idx, )))
1483 ieloop.FinalizeAll()
1485 # Remove crypto key and certificate
1486 result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1487 result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1489 return cbs.disk_results
1492 def _GetImportExportHandshakeMessage(version):
1493 """Returns the handshake message for a RIE protocol version.
1495 @type version: number
1498 return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1501 def ComputeRemoteExportHandshake(cds):
1502 """Computes the remote import/export handshake.
1505 @param cds: Cluster domain secret
1508 salt = utils.GenerateSecret(8)
1509 msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1510 return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1513 def CheckRemoteExportHandshake(cds, handshake):
1514 """Checks the handshake of a remote import/export.
1517 @param cds: Cluster domain secret
1518 @type handshake: sequence
1519 @param handshake: Handshake sent by remote peer
1523 (version, hmac_digest, hmac_salt) = handshake
1524 except (TypeError, ValueError), err:
1525 return "Invalid data: %s" % err
1527 if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1528 hmac_digest, salt=hmac_salt):
1529 return "Hash didn't match, clusters don't share the same domain secret"
1531 if version != constants.RIE_VERSION:
1532 return ("Clusters don't have the same remote import/export protocol"
1533 " (local=%s, remote=%s)" %
1534 (constants.RIE_VERSION, version))
1539 def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1540 """Returns the hashed text for import/export disk information.
1542 @type disk_index: number
1543 @param disk_index: Index of disk (included in hash)
1545 @param host: Hostname
1547 @param port: Daemon port
1549 @param magic: Magic value
1552 return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1555 def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1556 """Verifies received disk information for an export.
1559 @param cds: Cluster domain secret
1560 @type disk_index: number
1561 @param disk_index: Index of disk (included in hash)
1562 @type disk_info: sequence
1563 @param disk_info: Disk information sent by remote peer
1567 (host, port, magic, hmac_digest, hmac_salt) = disk_info
1568 except (TypeError, ValueError), err:
1569 raise errors.GenericError("Invalid data: %s" % err)
1571 if not (host and port and magic):
1572 raise errors.GenericError("Missing destination host, port or magic")
1574 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1576 if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1577 raise errors.GenericError("HMAC is wrong")
1579 if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1582 destination = netutils.Hostname.GetNormalizedName(host)
1584 return (destination,
1585 utils.ValidateServiceName(port),
1589 def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1590 """Computes the signed disk information for a remote import.
1593 @param cds: Cluster domain secret
1595 @param salt: HMAC salt
1596 @type disk_index: number
1597 @param disk_index: Index of disk (included in hash)
1599 @param host: Hostname
1601 @param port: Daemon port
1603 @param magic: Magic value
1606 msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1607 hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1608 return (host, port, magic, hmac_digest, salt)
1611 def CalculateGroupIPolicy(cluster, group):
1612 """Calculate instance policy for group.
1615 return cluster.SimpleFillIPolicy(group.ipolicy)
1618 def ComputeDiskSize(disk_template, disks):
1619 """Compute disk size requirements according to disk template
1622 # Required free disk space as a function of disk and swap space
1624 constants.DT_DISKLESS: None,
1625 constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
1626 # 128 MB are added for drbd metadata for each disk
1628 sum(d[constants.IDISK_SIZE] + constants.DRBD_META_SIZE for d in disks),
1629 constants.DT_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
1630 constants.DT_SHARED_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
1631 constants.DT_BLOCK: 0,
1632 constants.DT_RBD: sum(d[constants.IDISK_SIZE] for d in disks),
1635 if disk_template not in req_size_dict:
1636 raise errors.ProgrammerError("Disk template '%s' size requirement"
1637 " is unknown" % disk_template)
1639 return req_size_dict[disk_template]