4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Instance-related functions and classes for masterd.
29 from ganeti import constants
30 from ganeti import errors
31 from ganeti import compat
34 class _ImportExportError(Exception):
35 """Local exception to report import/export errors.
40 class ImportExportTimeouts(object):
41 #: Time until daemon starts writing status file
42 DEFAULT_READY_TIMEOUT = 10
44 #: Length of time until errors cause hard failure
45 DEFAULT_ERROR_TIMEOUT = 10
47 #: Time after which daemon must be listening
48 DEFAULT_LISTEN_TIMEOUT = 10
57 def __init__(self, connect,
58 listen=DEFAULT_LISTEN_TIMEOUT,
59 error=DEFAULT_ERROR_TIMEOUT,
60 ready=DEFAULT_READY_TIMEOUT):
61 """Initializes this class.
64 @param connect: Timeout for establishing connection
66 @param listen: Timeout for starting to listen for connections
68 @param error: Length of time until errors cause hard failure
70 @param ready: Timeout for daemon to become ready
76 self.connect = connect
79 class ImportExportCbBase(object):
80 """Callbacks for disk import/export.
83 def ReportListening(self, ie, private):
84 """Called when daemon started listening.
86 @type ie: Subclass of L{_DiskImportExportBase}
87 @param ie: Import/export object
88 @param private: Private data passed to import/export object
92 def ReportConnected(self, ie, private):
93 """Called when a connection has been established.
95 @type ie: Subclass of L{_DiskImportExportBase}
96 @param ie: Import/export object
97 @param private: Private data passed to import/export object
101 def ReportFinished(self, ie, private):
102 """Called when a transfer has finished.
104 @type ie: Subclass of L{_DiskImportExportBase}
105 @param ie: Import/export object
106 @param private: Private data passed to import/export object
111 def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
112 """Checks whether a timeout has expired.
115 return _time_fn() > (epoch + timeout)
118 class _DiskImportExportBase(object):
121 def __init__(self, lu, node_name, x509_key_name, remote_x509_ca,
122 instance, timeouts, cbs, private=None):
123 """Initializes this class.
125 @param lu: Logical unit instance
126 @type node_name: string
127 @param node_name: Node name for import
128 @type x509_key_name: string
129 @param x509_key_name: Name of X509 key (None for node daemon key)
130 @type remote_x509_ca: string
131 @param remote_x509_ca: Remote peer's CA (None for node daemon certificate)
132 @type instance: L{objects.Instance}
133 @param instance: Instance object
134 @type timeouts: L{ImportExportTimeouts}
135 @param timeouts: Timeouts for this import
136 @type cbs: L{ImportExportCbBase}
137 @param cbs: Callbacks
138 @param private: Private data for callback functions
141 assert self.MODE_TEXT
144 self.node_name = node_name
145 self._x509_key_name = x509_key_name
146 self._remote_x509_ca = remote_x509_ca
147 self._instance = instance
148 self._timeouts = timeouts
150 self._private = private
156 self._ts_begin = None
157 self._ts_connected = None
158 self._ts_finished = None
159 self._ts_cleanup = None
160 self._ts_last_error = None
164 self.final_message = None
167 self._daemon_name = None
171 def recent_output(self):
172 """Returns the most recent output from the daemon.
176 return self._daemon.recent_output
182 """Determines whether this transport is still active.
185 return self.success is None
189 """Returns parent loop.
191 @rtype: L{ImportExportLoop}
196 def SetLoop(self, loop):
197 """Sets the parent loop.
199 @type loop: L{ImportExportLoop}
203 raise errors.ProgrammerError("Loop can only be set once")
207 def _StartDaemon(self):
208 """Starts the import/export daemon.
211 raise NotImplementedError()
213 def CheckDaemon(self):
214 """Checks whether daemon has been started and if not, starts it.
220 assert self._ts_cleanup is None
222 if self._daemon_name is None:
223 assert self._ts_begin is None
225 result = self._StartDaemon()
227 raise _ImportExportError("Failed to start %s on %s: %s" %
228 (self.MODE_TEXT, self.node_name,
231 daemon_name = result.payload
233 logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
236 self._ts_begin = time.time()
237 self._daemon_name = daemon_name
239 return self._daemon_name
241 def GetDaemonName(self):
242 """Returns the daemon name.
245 assert self._daemon_name, "Daemon has not been started"
246 assert self._ts_cleanup is None
247 return self._daemon_name
250 """Sends SIGTERM to import/export daemon (if still active).
253 if self._daemon_name:
254 self._lu.LogWarning("Aborting %s %r on %s",
255 self.MODE_TEXT, self._daemon_name, self.node_name)
256 result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
258 self._lu.LogWarning("Failed to abort %s %r on %s: %s",
259 self.MODE_TEXT, self._daemon_name,
260 self.node_name, result.fail_msg)
265 def _SetDaemonData(self, data):
266 """Internal function for updating status daemon data.
268 @type data: L{objects.ImportExportStatus}
269 @param data: Daemon status data
272 assert self._ts_begin is not None
275 if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
276 raise _ImportExportError("Didn't become ready after %s seconds" %
277 self._timeouts.ready)
285 def SetDaemonData(self, success, data):
286 """Updates daemon status data.
289 @param success: Whether fetching data was successful or not
290 @type data: L{objects.ImportExportStatus}
291 @param data: Daemon status data
295 if self._ts_last_error is None:
296 self._ts_last_error = time.time()
298 elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
299 raise _ImportExportError("Too many errors while updating data")
303 self._ts_last_error = None
305 return self._SetDaemonData(data)
307 def CheckListening(self):
308 """Checks whether the daemon is listening.
311 raise NotImplementedError()
313 def _GetConnectedCheckEpoch(self):
314 """Returns timeout to calculate connect timeout.
317 raise NotImplementedError()
319 def CheckConnected(self):
320 """Checks whether the daemon is connected.
323 @return: Whether the daemon is connected
326 assert self._daemon, "Daemon status missing"
328 if self._ts_connected is not None:
331 if self._daemon.connected:
332 self._ts_connected = time.time()
334 # TODO: Log remote peer
335 logging.debug("%s %r on %s is now connected",
336 self.MODE_TEXT, self._daemon_name, self.node_name)
338 self._cbs.ReportConnected(self, self._private)
342 if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
343 raise _ImportExportError("Not connected after %s seconds" %
344 self._timeouts.connect)
348 def CheckFinished(self):
349 """Checks whether the daemon exited.
352 @return: Whether the transfer is finished
355 assert self._daemon, "Daemon status missing"
357 if self._ts_finished:
360 if self._daemon.exit_status is None:
363 self._ts_finished = time.time()
365 self._ReportFinished(self._daemon.exit_status == 0,
366 self._daemon.error_message)
370 def _ReportFinished(self, success, message):
371 """Transfer is finished or daemon exited.
374 @param success: Whether the transfer was successful
375 @type message: string
376 @param message: Error message
379 assert self.success is None
381 self.success = success
382 self.final_message = message
385 logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
387 elif self._daemon_name:
388 self._lu.LogWarning("%s %r on %s failed: %s",
389 self.MODE_TEXT, self._daemon_name, self.node_name,
392 self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
393 self.node_name, message)
395 self._cbs.ReportFinished(self, self._private)
398 """Makes the RPC call to finalize this import/export.
401 return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
403 def Finalize(self, error=None):
404 """Finalizes this import/export.
407 assert error or self.success is not None
409 if self._daemon_name:
410 logging.info("Finalizing %s %r on %s",
411 self.MODE_TEXT, self._daemon_name, self.node_name)
413 result = self._Finalize()
415 self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
416 self.MODE_TEXT, self._daemon_name,
417 self.node_name, result.fail_msg)
420 # Daemon is no longer running
421 self._daemon_name = None
422 self._ts_cleanup = time.time()
425 self._ReportFinished(False, error)
430 class DiskImport(_DiskImportExportBase):
433 def __init__(self, lu, node_name, x509_key_name, source_x509_ca, instance,
434 dest, dest_args, timeouts, cbs, private=None):
435 """Initializes this class.
437 @param lu: Logical unit instance
438 @type node_name: string
439 @param node_name: Node name for import
440 @type x509_key_name: string
441 @param x509_key_name: Name of X509 key (None for node daemon key)
442 @type source_x509_ca: string
443 @param source_x509_ca: Remote peer's CA (None for node daemon certificate)
444 @type instance: L{objects.Instance}
445 @param instance: Instance object
446 @param dest: I/O destination
447 @param dest_args: I/O arguments
448 @type timeouts: L{ImportExportTimeouts}
449 @param timeouts: Timeouts for this import
450 @type cbs: L{ImportExportCbBase}
451 @param cbs: Callbacks
452 @param private: Private data for callback functions
455 _DiskImportExportBase.__init__(self, lu, node_name,
456 x509_key_name, source_x509_ca,
457 instance, timeouts, cbs, private)
459 self._dest_args = dest_args
462 self._ts_listening = None
465 def listen_port(self):
466 """Returns the port the daemon is listening on.
470 return self._daemon.listen_port
474 def _StartDaemon(self):
475 """Starts the import daemon.
478 return self._lu.rpc.call_import_start(self.node_name,
480 self._remote_x509_ca, self._instance,
481 self._dest, self._dest_args)
483 def CheckListening(self):
484 """Checks whether the daemon is listening.
487 @return: Whether the daemon is listening
490 assert self._daemon, "Daemon status missing"
492 if self._ts_listening is not None:
495 port = self._daemon.listen_port
497 self._ts_listening = time.time()
499 logging.debug("Import %r on %s is now listening on port %s",
500 self._daemon_name, self.node_name, port)
502 self._cbs.ReportListening(self, self._private)
506 if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
507 raise _ImportExportError("Not listening after %s seconds" %
508 self._timeouts.listen)
512 def _GetConnectedCheckEpoch(self):
513 """Returns the time since we started listening.
516 assert self._ts_listening is not None, \
517 ("Checking whether an import is connected is only useful"
518 " once it's been listening")
520 return self._ts_listening
523 class DiskExport(_DiskImportExportBase):
526 def __init__(self, lu, node_name, x509_key_name, dest_x509_ca,
527 dest_host, dest_port, instance, source, source_args,
528 timeouts, cbs, private=None):
529 """Initializes this class.
531 @param lu: Logical unit instance
532 @type node_name: string
533 @param node_name: Node name for import
534 @type x509_key_name: string
535 @param x509_key_name: Name of X509 key (None for node daemon key)
536 @type dest_x509_ca: string
537 @param dest_x509_ca: Remote peer's CA (None for node daemon certificate)
538 @type dest_host: string
539 @param dest_host: Destination host name or IP address
540 @type dest_port: number
541 @param dest_port: Destination port number
542 @type instance: L{objects.Instance}
543 @param instance: Instance object
544 @param source: I/O source
545 @param source_args: I/O source
546 @type timeouts: L{ImportExportTimeouts}
547 @param timeouts: Timeouts for this import
548 @type cbs: L{ImportExportCbBase}
549 @param cbs: Callbacks
550 @param private: Private data for callback functions
553 _DiskImportExportBase.__init__(self, lu, node_name,
554 x509_key_name, dest_x509_ca,
555 instance, timeouts, cbs, private)
556 self._dest_host = dest_host
557 self._dest_port = dest_port
558 self._source = source
559 self._source_args = source_args
561 def _StartDaemon(self):
562 """Starts the export daemon.
565 return self._lu.rpc.call_export_start(self.node_name, self._x509_key_name,
566 self._remote_x509_ca,
567 self._dest_host, self._dest_port,
568 self._instance, self._source,
571 def CheckListening(self):
572 """Checks whether the daemon is listening.
575 # Only an import can be listening
578 def _GetConnectedCheckEpoch(self):
579 """Returns the time since the daemon started.
582 assert self._ts_begin is not None
584 return self._ts_begin
587 class ImportExportLoop:
591 def __init__(self, lu):
592 """Initializes this class.
597 self._pending_add = []
599 def Add(self, diskie):
600 """Adds an import/export object to the loop.
602 @type diskie: Subclass of L{_DiskImportExportBase}
603 @param diskie: Import/export object
606 assert diskie not in self._pending_add
607 assert diskie.loop is None
611 # Adding new objects to a staging list is necessary, otherwise the main
612 # loop gets confused if callbacks modify the queue while the main loop is
614 self._pending_add.append(diskie)
617 def _CollectDaemonStatus(lu, daemons):
618 """Collects the status for all import/export daemons.
623 for node_name, names in daemons.iteritems():
624 result = lu.rpc.call_impexp_status(node_name, names)
626 lu.LogWarning("Failed to get daemon status on %s: %s",
627 node_name, result.fail_msg)
630 assert len(names) == len(result.payload)
632 daemon_status[node_name] = dict(zip(names, result.payload))
637 def _GetActiveDaemonNames(queue):
638 """Gets the names of all active daemons.
643 if not diskie.active:
647 # Start daemon if necessary
648 daemon_name = diskie.CheckDaemon()
649 except _ImportExportError, err:
650 logging.exception("%s failed", diskie.MODE_TEXT)
651 diskie.Finalize(error=str(err))
654 result.setdefault(diskie.node_name, []).append(daemon_name)
656 assert len(queue) >= len(result)
657 assert len(queue) >= sum([len(names) for names in result.itervalues()])
659 logging.debug("daemons=%r", result)
663 def _AddPendingToQueue(self):
664 """Adds all pending import/export objects to the internal queue.
667 assert compat.all(diskie not in self._queue and diskie.loop == self
668 for diskie in self._pending_add)
670 self._queue.extend(self._pending_add)
672 del self._pending_add[:]
675 """Utility main loop.
679 self._AddPendingToQueue()
681 # Collect all active daemon names
682 daemons = self._GetActiveDaemonNames(self._queue)
686 # Collection daemon status data
687 data = self._CollectDaemonStatus(self._lu, daemons)
690 delay = self.MAX_DELAY
691 for diskie in self._queue:
692 if not diskie.active:
697 all_daemon_data = data[diskie.node_name]
699 result = diskie.SetDaemonData(False, None)
702 diskie.SetDaemonData(True,
703 all_daemon_data[diskie.GetDaemonName()])
706 # Daemon not yet ready, retry soon
707 delay = min(3.0, delay)
710 if diskie.CheckFinished():
715 # Normal case: check again in 5 seconds
716 delay = min(5.0, delay)
718 if not diskie.CheckListening():
719 # Not yet listening, retry soon
720 delay = min(1.0, delay)
723 if not diskie.CheckConnected():
724 # Not yet connected, retry soon
725 delay = min(1.0, delay)
728 except _ImportExportError, err:
729 logging.exception("%s failed", diskie.MODE_TEXT)
730 diskie.Finalize(error=str(err))
732 if not compat.any([diskie.active for diskie in self._queue]):
736 delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
737 logging.debug("Waiting for %ss", delay)
740 def FinalizeAll(self):
741 """Finalizes all pending transfers.
746 for diskie in self._queue:
747 success = diskie.Finalize() and success
752 class _TransferInstCbBase(ImportExportCbBase):
753 def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
755 """Initializes this class.
758 ImportExportCbBase.__init__(self)
761 self.feedback_fn = feedback_fn
762 self.instance = instance
763 self.timeouts = timeouts
764 self.src_node = src_node
765 self.src_cbs = src_cbs
766 self.dest_node = dest_node
767 self.dest_ip = dest_ip
770 class _TransferInstSourceCb(_TransferInstCbBase):
771 def ReportConnected(self, ie, dtp):
772 """Called when a connection has been established.
775 assert self.src_cbs is None
776 assert dtp.src_export == ie
777 assert dtp.dest_import
779 self.feedback_fn("%s is sending data on %s" %
780 (dtp.data.name, ie.node_name))
782 def ReportFinished(self, ie, dtp):
783 """Called when a transfer has finished.
786 assert self.src_cbs is None
787 assert dtp.src_export == ie
788 assert dtp.dest_import
791 self.feedback_fn("%s finished sending data" % dtp.data.name)
793 self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
794 (dtp.data.name, ie.final_message, ie.recent_output))
796 dtp.RecordResult(ie.success)
798 cb = dtp.data.finished_fn
802 # TODO: Check whether sending SIGTERM right away is okay, maybe we should
803 # give the daemon a moment to sort things out
804 if dtp.dest_import and not ie.success:
805 dtp.dest_import.Abort()
808 class _TransferInstDestCb(_TransferInstCbBase):
809 def ReportListening(self, ie, dtp):
810 """Called when daemon started listening.
814 assert dtp.src_export is None
815 assert dtp.dest_import
817 self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
819 # Start export on source node
820 de = DiskExport(self.lu, self.src_node, None, None, self.dest_ip,
821 ie.listen_port, self.instance,
822 dtp.data.src_io, dtp.data.src_ioargs,
823 self.timeouts, self.src_cbs, private=dtp)
828 def ReportConnected(self, ie, dtp):
829 """Called when a connection has been established.
832 self.feedback_fn("%s is receiving data on %s" %
833 (dtp.data.name, self.dest_node))
835 def ReportFinished(self, ie, dtp):
836 """Called when a transfer has finished.
840 self.feedback_fn("%s finished receiving data" % dtp.data.name)
842 self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
843 (dtp.data.name, ie.final_message, ie.recent_output))
845 dtp.RecordResult(ie.success)
847 # TODO: Check whether sending SIGTERM right away is okay, maybe we should
848 # give the daemon a moment to sort things out
849 if dtp.src_export and not ie.success:
850 dtp.src_export.Abort()
853 class DiskTransfer(object):
854 def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
856 """Initializes this class.
859 @param name: User-visible name for this transfer (e.g. "disk/0")
860 @param src_io: Source I/O type
861 @param src_ioargs: Source I/O arguments
862 @param dest_io: Destination I/O type
863 @param dest_ioargs: Destination I/O arguments
864 @type finished_fn: callable
865 @param finished_fn: Function called once transfer has finished
871 self.src_ioargs = src_ioargs
873 self.dest_io = dest_io
874 self.dest_ioargs = dest_ioargs
876 self.finished_fn = finished_fn
879 class _DiskTransferPrivate(object):
880 def __init__(self, data, success):
881 """Initializes this class.
883 @type data: L{DiskTransfer}
889 self.src_export = None
890 self.dest_import = None
892 self.success = success
894 def RecordResult(self, success):
895 """Updates the status.
897 One failed part will cause the whole transfer to fail.
900 self.success = self.success and success
903 def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
904 instance, all_transfers):
905 """Transfers an instance's data from one node to another.
907 @param lu: Logical unit instance
908 @param feedback_fn: Feedback function
909 @type src_node: string
910 @param src_node: Source node name
911 @type dest_node: string
912 @param dest_node: Destination node name
913 @type dest_ip: string
914 @param dest_ip: IP address of destination node
915 @type instance: L{objects.Instance}
916 @param instance: Instance object
917 @type all_transfers: list of L{DiskTransfer} instances
918 @param all_transfers: List of all disk transfers to be made
920 @return: List with a boolean (True=successful, False=failed) for success for
924 timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
925 src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
926 src_node, None, dest_node, dest_ip)
927 dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
928 src_node, src_cbs, dest_node, dest_ip)
932 ieloop = ImportExportLoop(lu)
934 for transfer in all_transfers:
936 feedback_fn("Exporting %s from %s to %s" %
937 (transfer.name, src_node, dest_node))
939 dtp = _DiskTransferPrivate(transfer, True)
941 di = DiskImport(lu, dest_node, None, None, instance,
942 transfer.dest_io, transfer.dest_ioargs,
943 timeouts, dest_cbs, private=dtp)
948 dtp = _DiskTransferPrivate(None, False)
956 assert len(all_dtp) == len(all_transfers)
957 assert compat.all([(dtp.src_export is None or
958 dtp.src_export.success is not None) and
959 (dtp.dest_import is None or
960 dtp.dest_import.success is not None)
961 for dtp in all_dtp]), \
962 "Not all imports/exports are finalized"
964 return [bool(dtp.success) for dtp in all_dtp]