cmdlib: Add utility to transfer instance data within the cluster
[ganeti-local] / lib / masterd / instance.py
1 #
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Instance-related functions and classes for masterd.
23
24 """
25
26 import logging
27 import time
28
29 from ganeti import constants
30 from ganeti import errors
31 from ganeti import compat
32
33
34 class _ImportExportError(Exception):
35   """Local exception to report import/export errors.
36
37   """
38
39
40 class ImportExportTimeouts(object):
41   #: Time until daemon starts writing status file
42   DEFAULT_READY_TIMEOUT = 10
43
44   #: Length of time until errors cause hard failure
45   DEFAULT_ERROR_TIMEOUT = 10
46
47   #: Time after which daemon must be listening
48   DEFAULT_LISTEN_TIMEOUT = 10
49
50   __slots__ = [
51     "error",
52     "ready",
53     "listen",
54     "connect",
55     ]
56
57   def __init__(self, connect,
58                listen=DEFAULT_LISTEN_TIMEOUT,
59                error=DEFAULT_ERROR_TIMEOUT,
60                ready=DEFAULT_READY_TIMEOUT):
61     """Initializes this class.
62
63     @type connect: number
64     @param connect: Timeout for establishing connection
65     @type listen: number
66     @param listen: Timeout for starting to listen for connections
67     @type error: number
68     @param error: Length of time until errors cause hard failure
69     @type ready: number
70     @param ready: Timeout for daemon to become ready
71
72     """
73     self.error = error
74     self.ready = ready
75     self.listen = listen
76     self.connect = connect
77
78
79 class ImportExportCbBase(object):
80   """Callbacks for disk import/export.
81
82   """
83   def ReportListening(self, ie, private):
84     """Called when daemon started listening.
85
86     @type ie: Subclass of L{_DiskImportExportBase}
87     @param ie: Import/export object
88     @param private: Private data passed to import/export object
89
90     """
91
92   def ReportConnected(self, ie, private):
93     """Called when a connection has been established.
94
95     @type ie: Subclass of L{_DiskImportExportBase}
96     @param ie: Import/export object
97     @param private: Private data passed to import/export object
98
99     """
100
101   def ReportFinished(self, ie, private):
102     """Called when a transfer has finished.
103
104     @type ie: Subclass of L{_DiskImportExportBase}
105     @param ie: Import/export object
106     @param private: Private data passed to import/export object
107
108     """
109
110
111 def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
112   """Checks whether a timeout has expired.
113
114   """
115   return _time_fn() > (epoch + timeout)
116
117
118 class _DiskImportExportBase(object):
119   MODE_TEXT = None
120
121   def __init__(self, lu, node_name, x509_key_name, remote_x509_ca,
122                instance, timeouts, cbs, private=None):
123     """Initializes this class.
124
125     @param lu: Logical unit instance
126     @type node_name: string
127     @param node_name: Node name for import
128     @type x509_key_name: string
129     @param x509_key_name: Name of X509 key (None for node daemon key)
130     @type remote_x509_ca: string
131     @param remote_x509_ca: Remote peer's CA (None for node daemon certificate)
132     @type instance: L{objects.Instance}
133     @param instance: Instance object
134     @type timeouts: L{ImportExportTimeouts}
135     @param timeouts: Timeouts for this import
136     @type cbs: L{ImportExportCbBase}
137     @param cbs: Callbacks
138     @param private: Private data for callback functions
139
140     """
141     assert self.MODE_TEXT
142
143     self._lu = lu
144     self.node_name = node_name
145     self._x509_key_name = x509_key_name
146     self._remote_x509_ca = remote_x509_ca
147     self._instance = instance
148     self._timeouts = timeouts
149     self._cbs = cbs
150     self._private = private
151
152     # Parent loop
153     self._loop = None
154
155     # Timestamps
156     self._ts_begin = None
157     self._ts_connected = None
158     self._ts_finished = None
159     self._ts_cleanup = None
160     self._ts_last_error = None
161
162     # Transfer status
163     self.success = None
164     self.final_message = None
165
166     # Daemon status
167     self._daemon_name = None
168     self._daemon = None
169
170   @property
171   def recent_output(self):
172     """Returns the most recent output from the daemon.
173
174     """
175     if self._daemon:
176       return self._daemon.recent_output
177
178     return None
179
180   @property
181   def active(self):
182     """Determines whether this transport is still active.
183
184     """
185     return self.success is None
186
187   @property
188   def loop(self):
189     """Returns parent loop.
190
191     @rtype: L{ImportExportLoop}
192
193     """
194     return self._loop
195
196   def SetLoop(self, loop):
197     """Sets the parent loop.
198
199     @type loop: L{ImportExportLoop}
200
201     """
202     if self._loop:
203       raise errors.ProgrammerError("Loop can only be set once")
204
205     self._loop = loop
206
207   def _StartDaemon(self):
208     """Starts the import/export daemon.
209
210     """
211     raise NotImplementedError()
212
213   def CheckDaemon(self):
214     """Checks whether daemon has been started and if not, starts it.
215
216     @rtype: string
217     @return: Daemon name
218
219     """
220     assert self._ts_cleanup is None
221
222     if self._daemon_name is None:
223       assert self._ts_begin is None
224
225       result = self._StartDaemon()
226       if result.fail_msg:
227         raise _ImportExportError("Failed to start %s on %s: %s" %
228                                  (self.MODE_TEXT, self.node_name,
229                                   result.fail_msg))
230
231       daemon_name = result.payload
232
233       logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
234                    self.node_name)
235
236       self._ts_begin = time.time()
237       self._daemon_name = daemon_name
238
239     return self._daemon_name
240
241   def GetDaemonName(self):
242     """Returns the daemon name.
243
244     """
245     assert self._daemon_name, "Daemon has not been started"
246     assert self._ts_cleanup is None
247     return self._daemon_name
248
249   def Abort(self):
250     """Sends SIGTERM to import/export daemon (if still active).
251
252     """
253     if self._daemon_name:
254       self._lu.LogWarning("Aborting %s %r on %s",
255                           self.MODE_TEXT, self._daemon_name, self.node_name)
256       result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
257       if result.fail_msg:
258         self._lu.LogWarning("Failed to abort %s %r on %s: %s",
259                             self.MODE_TEXT, self._daemon_name,
260                             self.node_name, result.fail_msg)
261         return False
262
263     return True
264
265   def _SetDaemonData(self, data):
266     """Internal function for updating status daemon data.
267
268     @type data: L{objects.ImportExportStatus}
269     @param data: Daemon status data
270
271     """
272     assert self._ts_begin is not None
273
274     if not data:
275       if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
276         raise _ImportExportError("Didn't become ready after %s seconds" %
277                                  self._timeouts.ready)
278
279       return False
280
281     self._daemon = data
282
283     return True
284
285   def SetDaemonData(self, success, data):
286     """Updates daemon status data.
287
288     @type success: bool
289     @param success: Whether fetching data was successful or not
290     @type data: L{objects.ImportExportStatus}
291     @param data: Daemon status data
292
293     """
294     if not success:
295       if self._ts_last_error is None:
296         self._ts_last_error = time.time()
297
298       elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
299         raise _ImportExportError("Too many errors while updating data")
300
301       return False
302
303     self._ts_last_error = None
304
305     return self._SetDaemonData(data)
306
307   def CheckListening(self):
308     """Checks whether the daemon is listening.
309
310     """
311     raise NotImplementedError()
312
313   def _GetConnectedCheckEpoch(self):
314     """Returns timeout to calculate connect timeout.
315
316     """
317     raise NotImplementedError()
318
319   def CheckConnected(self):
320     """Checks whether the daemon is connected.
321
322     @rtype: bool
323     @return: Whether the daemon is connected
324
325     """
326     assert self._daemon, "Daemon status missing"
327
328     if self._ts_connected is not None:
329       return True
330
331     if self._daemon.connected:
332       self._ts_connected = time.time()
333
334       # TODO: Log remote peer
335       logging.debug("%s %r on %s is now connected",
336                     self.MODE_TEXT, self._daemon_name, self.node_name)
337
338       self._cbs.ReportConnected(self, self._private)
339
340       return True
341
342     if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
343       raise _ImportExportError("Not connected after %s seconds" %
344                                self._timeouts.connect)
345
346     return False
347
348   def CheckFinished(self):
349     """Checks whether the daemon exited.
350
351     @rtype: bool
352     @return: Whether the transfer is finished
353
354     """
355     assert self._daemon, "Daemon status missing"
356
357     if self._ts_finished:
358       return True
359
360     if self._daemon.exit_status is None:
361       return False
362
363     self._ts_finished = time.time()
364
365     self._ReportFinished(self._daemon.exit_status == 0,
366                          self._daemon.error_message)
367
368     return True
369
370   def _ReportFinished(self, success, message):
371     """Transfer is finished or daemon exited.
372
373     @type success: bool
374     @param success: Whether the transfer was successful
375     @type message: string
376     @param message: Error message
377
378     """
379     assert self.success is None
380
381     self.success = success
382     self.final_message = message
383
384     if success:
385       logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
386                    self.node_name)
387     elif self._daemon_name:
388       self._lu.LogWarning("%s %r on %s failed: %s",
389                           self.MODE_TEXT, self._daemon_name, self.node_name,
390                           message)
391     else:
392       self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
393                           self.node_name, message)
394
395     self._cbs.ReportFinished(self, self._private)
396
397   def _Finalize(self):
398     """Makes the RPC call to finalize this import/export.
399
400     """
401     return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
402
403   def Finalize(self, error=None):
404     """Finalizes this import/export.
405
406     """
407     assert error or self.success is not None
408
409     if self._daemon_name:
410       logging.info("Finalizing %s %r on %s",
411                    self.MODE_TEXT, self._daemon_name, self.node_name)
412
413       result = self._Finalize()
414       if result.fail_msg:
415         self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
416                             self.MODE_TEXT, self._daemon_name,
417                             self.node_name, result.fail_msg)
418         return False
419
420       # Daemon is no longer running
421       self._daemon_name = None
422       self._ts_cleanup = time.time()
423
424     if error:
425       self._ReportFinished(False, error)
426
427     return True
428
429
430 class DiskImport(_DiskImportExportBase):
431   MODE_TEXT = "import"
432
433   def __init__(self, lu, node_name, x509_key_name, source_x509_ca, instance,
434                dest, dest_args, timeouts, cbs, private=None):
435     """Initializes this class.
436
437     @param lu: Logical unit instance
438     @type node_name: string
439     @param node_name: Node name for import
440     @type x509_key_name: string
441     @param x509_key_name: Name of X509 key (None for node daemon key)
442     @type source_x509_ca: string
443     @param source_x509_ca: Remote peer's CA (None for node daemon certificate)
444     @type instance: L{objects.Instance}
445     @param instance: Instance object
446     @param dest: I/O destination
447     @param dest_args: I/O arguments
448     @type timeouts: L{ImportExportTimeouts}
449     @param timeouts: Timeouts for this import
450     @type cbs: L{ImportExportCbBase}
451     @param cbs: Callbacks
452     @param private: Private data for callback functions
453
454     """
455     _DiskImportExportBase.__init__(self, lu, node_name,
456                                    x509_key_name, source_x509_ca,
457                                    instance, timeouts, cbs, private)
458     self._dest = dest
459     self._dest_args = dest_args
460
461     # Timestamps
462     self._ts_listening = None
463
464   @property
465   def listen_port(self):
466     """Returns the port the daemon is listening on.
467
468     """
469     if self._daemon:
470       return self._daemon.listen_port
471
472     return None
473
474   def _StartDaemon(self):
475     """Starts the import daemon.
476
477     """
478     return self._lu.rpc.call_import_start(self.node_name,
479                                           self._x509_key_name,
480                                           self._remote_x509_ca, self._instance,
481                                           self._dest, self._dest_args)
482
483   def CheckListening(self):
484     """Checks whether the daemon is listening.
485
486     @rtype: bool
487     @return: Whether the daemon is listening
488
489     """
490     assert self._daemon, "Daemon status missing"
491
492     if self._ts_listening is not None:
493       return True
494
495     port = self._daemon.listen_port
496     if port is not None:
497       self._ts_listening = time.time()
498
499       logging.debug("Import %r on %s is now listening on port %s",
500                     self._daemon_name, self.node_name, port)
501
502       self._cbs.ReportListening(self, self._private)
503
504       return True
505
506     if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
507       raise _ImportExportError("Not listening after %s seconds" %
508                                self._timeouts.listen)
509
510     return False
511
512   def _GetConnectedCheckEpoch(self):
513     """Returns the time since we started listening.
514
515     """
516     assert self._ts_listening is not None, \
517            ("Checking whether an import is connected is only useful"
518             " once it's been listening")
519
520     return self._ts_listening
521
522
523 class DiskExport(_DiskImportExportBase):
524   MODE_TEXT = "export"
525
526   def __init__(self, lu, node_name, x509_key_name, dest_x509_ca,
527                dest_host, dest_port, instance, source, source_args,
528                timeouts, cbs, private=None):
529     """Initializes this class.
530
531     @param lu: Logical unit instance
532     @type node_name: string
533     @param node_name: Node name for import
534     @type x509_key_name: string
535     @param x509_key_name: Name of X509 key (None for node daemon key)
536     @type dest_x509_ca: string
537     @param dest_x509_ca: Remote peer's CA (None for node daemon certificate)
538     @type dest_host: string
539     @param dest_host: Destination host name or IP address
540     @type dest_port: number
541     @param dest_port: Destination port number
542     @type instance: L{objects.Instance}
543     @param instance: Instance object
544     @param source: I/O source
545     @param source_args: I/O source
546     @type timeouts: L{ImportExportTimeouts}
547     @param timeouts: Timeouts for this import
548     @type cbs: L{ImportExportCbBase}
549     @param cbs: Callbacks
550     @param private: Private data for callback functions
551
552     """
553     _DiskImportExportBase.__init__(self, lu, node_name,
554                                    x509_key_name, dest_x509_ca,
555                                    instance, timeouts, cbs, private)
556     self._dest_host = dest_host
557     self._dest_port = dest_port
558     self._source = source
559     self._source_args = source_args
560
561   def _StartDaemon(self):
562     """Starts the export daemon.
563
564     """
565     return self._lu.rpc.call_export_start(self.node_name, self._x509_key_name,
566                                           self._remote_x509_ca,
567                                           self._dest_host, self._dest_port,
568                                           self._instance, self._source,
569                                           self._source_args)
570
571   def CheckListening(self):
572     """Checks whether the daemon is listening.
573
574     """
575     # Only an import can be listening
576     return True
577
578   def _GetConnectedCheckEpoch(self):
579     """Returns the time since the daemon started.
580
581     """
582     assert self._ts_begin is not None
583
584     return self._ts_begin
585
586
587 class ImportExportLoop:
588   MIN_DELAY = 1.0
589   MAX_DELAY = 20.0
590
591   def __init__(self, lu):
592     """Initializes this class.
593
594     """
595     self._lu = lu
596     self._queue = []
597     self._pending_add = []
598
599   def Add(self, diskie):
600     """Adds an import/export object to the loop.
601
602     @type diskie: Subclass of L{_DiskImportExportBase}
603     @param diskie: Import/export object
604
605     """
606     assert diskie not in self._pending_add
607     assert diskie.loop is None
608
609     diskie.SetLoop(self)
610
611     # Adding new objects to a staging list is necessary, otherwise the main
612     # loop gets confused if callbacks modify the queue while the main loop is
613     # iterating over it.
614     self._pending_add.append(diskie)
615
616   @staticmethod
617   def _CollectDaemonStatus(lu, daemons):
618     """Collects the status for all import/export daemons.
619
620     """
621     daemon_status = {}
622
623     for node_name, names in daemons.iteritems():
624       result = lu.rpc.call_impexp_status(node_name, names)
625       if result.fail_msg:
626         lu.LogWarning("Failed to get daemon status on %s: %s",
627                       node_name, result.fail_msg)
628         continue
629
630       assert len(names) == len(result.payload)
631
632       daemon_status[node_name] = dict(zip(names, result.payload))
633
634     return daemon_status
635
636   @staticmethod
637   def _GetActiveDaemonNames(queue):
638     """Gets the names of all active daemons.
639
640     """
641     result = {}
642     for diskie in queue:
643       if not diskie.active:
644         continue
645
646       try:
647         # Start daemon if necessary
648         daemon_name = diskie.CheckDaemon()
649       except _ImportExportError, err:
650         logging.exception("%s failed", diskie.MODE_TEXT)
651         diskie.Finalize(error=str(err))
652         continue
653
654       result.setdefault(diskie.node_name, []).append(daemon_name)
655
656     assert len(queue) >= len(result)
657     assert len(queue) >= sum([len(names) for names in result.itervalues()])
658
659     logging.debug("daemons=%r", result)
660
661     return result
662
663   def _AddPendingToQueue(self):
664     """Adds all pending import/export objects to the internal queue.
665
666     """
667     assert compat.all(diskie not in self._queue and diskie.loop == self
668                       for diskie in self._pending_add)
669
670     self._queue.extend(self._pending_add)
671
672     del self._pending_add[:]
673
674   def Run(self):
675     """Utility main loop.
676
677     """
678     while True:
679       self._AddPendingToQueue()
680
681       # Collect all active daemon names
682       daemons = self._GetActiveDaemonNames(self._queue)
683       if not daemons:
684         break
685
686       # Collection daemon status data
687       data = self._CollectDaemonStatus(self._lu, daemons)
688
689       # Use data
690       delay = self.MAX_DELAY
691       for diskie in self._queue:
692         if not diskie.active:
693           continue
694
695         try:
696           try:
697             all_daemon_data = data[diskie.node_name]
698           except KeyError:
699             result = diskie.SetDaemonData(False, None)
700           else:
701             result = \
702               diskie.SetDaemonData(True,
703                                    all_daemon_data[diskie.GetDaemonName()])
704
705           if not result:
706             # Daemon not yet ready, retry soon
707             delay = min(3.0, delay)
708             continue
709
710           if diskie.CheckFinished():
711             # Transfer finished
712             diskie.Finalize()
713             continue
714
715           # Normal case: check again in 5 seconds
716           delay = min(5.0, delay)
717
718           if not diskie.CheckListening():
719             # Not yet listening, retry soon
720             delay = min(1.0, delay)
721             continue
722
723           if not diskie.CheckConnected():
724             # Not yet connected, retry soon
725             delay = min(1.0, delay)
726             continue
727
728         except _ImportExportError, err:
729           logging.exception("%s failed", diskie.MODE_TEXT)
730           diskie.Finalize(error=str(err))
731
732       if not compat.any([diskie.active for diskie in self._queue]):
733         break
734
735       # Wait a bit
736       delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
737       logging.debug("Waiting for %ss", delay)
738       time.sleep(delay)
739
740   def FinalizeAll(self):
741     """Finalizes all pending transfers.
742
743     """
744     success = True
745
746     for diskie in self._queue:
747       success = diskie.Finalize() and success
748
749     return success
750
751
752 class _TransferInstCbBase(ImportExportCbBase):
753   def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
754                dest_node, dest_ip):
755     """Initializes this class.
756
757     """
758     ImportExportCbBase.__init__(self)
759
760     self.lu = lu
761     self.feedback_fn = feedback_fn
762     self.instance = instance
763     self.timeouts = timeouts
764     self.src_node = src_node
765     self.src_cbs = src_cbs
766     self.dest_node = dest_node
767     self.dest_ip = dest_ip
768
769
770 class _TransferInstSourceCb(_TransferInstCbBase):
771   def ReportConnected(self, ie, dtp):
772     """Called when a connection has been established.
773
774     """
775     assert self.src_cbs is None
776     assert dtp.src_export == ie
777     assert dtp.dest_import
778
779     self.feedback_fn("%s is sending data on %s" %
780                      (dtp.data.name, ie.node_name))
781
782   def ReportFinished(self, ie, dtp):
783     """Called when a transfer has finished.
784
785     """
786     assert self.src_cbs is None
787     assert dtp.src_export == ie
788     assert dtp.dest_import
789
790     if ie.success:
791       self.feedback_fn("%s finished sending data" % dtp.data.name)
792     else:
793       self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
794                        (dtp.data.name, ie.final_message, ie.recent_output))
795
796     dtp.RecordResult(ie.success)
797
798     cb = dtp.data.finished_fn
799     if cb:
800       cb()
801
802     # TODO: Check whether sending SIGTERM right away is okay, maybe we should
803     # give the daemon a moment to sort things out
804     if dtp.dest_import and not ie.success:
805       dtp.dest_import.Abort()
806
807
808 class _TransferInstDestCb(_TransferInstCbBase):
809   def ReportListening(self, ie, dtp):
810     """Called when daemon started listening.
811
812     """
813     assert self.src_cbs
814     assert dtp.src_export is None
815     assert dtp.dest_import
816
817     self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
818
819     # Start export on source node
820     de = DiskExport(self.lu, self.src_node, None, None, self.dest_ip,
821                     ie.listen_port, self.instance,
822                     dtp.data.src_io, dtp.data.src_ioargs,
823                     self.timeouts, self.src_cbs, private=dtp)
824     ie.loop.Add(de)
825
826     dtp.src_export = de
827
828   def ReportConnected(self, ie, dtp):
829     """Called when a connection has been established.
830
831     """
832     self.feedback_fn("%s is receiving data on %s" %
833                      (dtp.data.name, self.dest_node))
834
835   def ReportFinished(self, ie, dtp):
836     """Called when a transfer has finished.
837
838     """
839     if ie.success:
840       self.feedback_fn("%s finished receiving data" % dtp.data.name)
841     else:
842       self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
843                        (dtp.data.name, ie.final_message, ie.recent_output))
844
845     dtp.RecordResult(ie.success)
846
847     # TODO: Check whether sending SIGTERM right away is okay, maybe we should
848     # give the daemon a moment to sort things out
849     if dtp.src_export and not ie.success:
850       dtp.src_export.Abort()
851
852
853 class DiskTransfer(object):
854   def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
855                finished_fn):
856     """Initializes this class.
857
858     @type name: string
859     @param name: User-visible name for this transfer (e.g. "disk/0")
860     @param src_io: Source I/O type
861     @param src_ioargs: Source I/O arguments
862     @param dest_io: Destination I/O type
863     @param dest_ioargs: Destination I/O arguments
864     @type finished_fn: callable
865     @param finished_fn: Function called once transfer has finished
866
867     """
868     self.name = name
869
870     self.src_io = src_io
871     self.src_ioargs = src_ioargs
872
873     self.dest_io = dest_io
874     self.dest_ioargs = dest_ioargs
875
876     self.finished_fn = finished_fn
877
878
879 class _DiskTransferPrivate(object):
880   def __init__(self, data, success):
881     """Initializes this class.
882
883     @type data: L{DiskTransfer}
884     @type success: bool
885
886     """
887     self.data = data
888
889     self.src_export = None
890     self.dest_import = None
891
892     self.success = success
893
894   def RecordResult(self, success):
895     """Updates the status.
896
897     One failed part will cause the whole transfer to fail.
898
899     """
900     self.success = self.success and success
901
902
903 def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
904                          instance, all_transfers):
905   """Transfers an instance's data from one node to another.
906
907   @param lu: Logical unit instance
908   @param feedback_fn: Feedback function
909   @type src_node: string
910   @param src_node: Source node name
911   @type dest_node: string
912   @param dest_node: Destination node name
913   @type dest_ip: string
914   @param dest_ip: IP address of destination node
915   @type instance: L{objects.Instance}
916   @param instance: Instance object
917   @type all_transfers: list of L{DiskTransfer} instances
918   @param all_transfers: List of all disk transfers to be made
919   @rtype: list
920   @return: List with a boolean (True=successful, False=failed) for success for
921            each transfer
922
923   """
924   timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
925   src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
926                                   src_node, None, dest_node, dest_ip)
927   dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
928                                  src_node, src_cbs, dest_node, dest_ip)
929
930   all_dtp = []
931
932   ieloop = ImportExportLoop(lu)
933   try:
934     for transfer in all_transfers:
935       if transfer:
936         feedback_fn("Exporting %s from %s to %s" %
937                     (transfer.name, src_node, dest_node))
938
939         dtp = _DiskTransferPrivate(transfer, True)
940
941         di = DiskImport(lu, dest_node, None, None, instance,
942                         transfer.dest_io, transfer.dest_ioargs,
943                         timeouts, dest_cbs, private=dtp)
944         ieloop.Add(di)
945
946         dtp.dest_import = di
947       else:
948         dtp = _DiskTransferPrivate(None, False)
949
950       all_dtp.append(dtp)
951
952     ieloop.Run()
953   finally:
954     ieloop.FinalizeAll()
955
956   assert len(all_dtp) == len(all_transfers)
957   assert compat.all([(dtp.src_export is None or
958                       dtp.src_export.success is not None) and
959                      (dtp.dest_import is None or
960                       dtp.dest_import.success is not None)
961                      for dtp in all_dtp]), \
962          "Not all imports/exports are finalized"
963
964   return [bool(dtp.success) for dtp in all_dtp]