import/export: Show progress updates to user
[ganeti-local] / lib / masterd / instance.py
1 #
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Instance-related functions and classes for masterd.
23
24 """
25
26 import logging
27 import time
28 import OpenSSL
29
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import compat
33 from ganeti import utils
34 from ganeti import objects
35
36
37 class _ImportExportError(Exception):
38   """Local exception to report import/export errors.
39
40   """
41
42
43 class ImportExportTimeouts(object):
44   #: Time until daemon starts writing status file
45   DEFAULT_READY_TIMEOUT = 10
46
47   #: Length of time until errors cause hard failure
48   DEFAULT_ERROR_TIMEOUT = 10
49
50   #: Time after which daemon must be listening
51   DEFAULT_LISTEN_TIMEOUT = 10
52
53   #: Progress update interval
54   DEFAULT_PROGRESS_INTERVAL = 60
55
56   __slots__ = [
57     "error",
58     "ready",
59     "listen",
60     "connect",
61     "progress",
62     ]
63
64   def __init__(self, connect,
65                listen=DEFAULT_LISTEN_TIMEOUT,
66                error=DEFAULT_ERROR_TIMEOUT,
67                ready=DEFAULT_READY_TIMEOUT,
68                progress=DEFAULT_PROGRESS_INTERVAL):
69     """Initializes this class.
70
71     @type connect: number
72     @param connect: Timeout for establishing connection
73     @type listen: number
74     @param listen: Timeout for starting to listen for connections
75     @type error: number
76     @param error: Length of time until errors cause hard failure
77     @type ready: number
78     @param ready: Timeout for daemon to become ready
79     @type progress: number
80     @param progress: Progress update interval
81
82     """
83     self.error = error
84     self.ready = ready
85     self.listen = listen
86     self.connect = connect
87     self.progress = progress
88
89
90 class ImportExportCbBase(object):
91   """Callbacks for disk import/export.
92
93   """
94   def ReportListening(self, ie, private):
95     """Called when daemon started listening.
96
97     @type ie: Subclass of L{_DiskImportExportBase}
98     @param ie: Import/export object
99     @param private: Private data passed to import/export object
100
101     """
102
103   def ReportConnected(self, ie, private):
104     """Called when a connection has been established.
105
106     @type ie: Subclass of L{_DiskImportExportBase}
107     @param ie: Import/export object
108     @param private: Private data passed to import/export object
109
110     """
111
112   def ReportProgress(self, ie, private):
113     """Called when new progress information should be reported.
114
115     @type ie: Subclass of L{_DiskImportExportBase}
116     @param ie: Import/export object
117     @param private: Private data passed to import/export object
118
119     """
120
121   def ReportFinished(self, ie, private):
122     """Called when a transfer has finished.
123
124     @type ie: Subclass of L{_DiskImportExportBase}
125     @param ie: Import/export object
126     @param private: Private data passed to import/export object
127
128     """
129
130
131 def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
132   """Checks whether a timeout has expired.
133
134   """
135   return _time_fn() > (epoch + timeout)
136
137
138 class _DiskImportExportBase(object):
139   MODE_TEXT = None
140
141   def __init__(self, lu, node_name, opts,
142                instance, timeouts, cbs, private=None):
143     """Initializes this class.
144
145     @param lu: Logical unit instance
146     @type node_name: string
147     @param node_name: Node name for import
148     @type opts: L{objects.ImportExportOptions}
149     @param opts: Import/export daemon options
150     @type instance: L{objects.Instance}
151     @param instance: Instance object
152     @type timeouts: L{ImportExportTimeouts}
153     @param timeouts: Timeouts for this import
154     @type cbs: L{ImportExportCbBase}
155     @param cbs: Callbacks
156     @param private: Private data for callback functions
157
158     """
159     assert self.MODE_TEXT
160
161     self._lu = lu
162     self.node_name = node_name
163     self._opts = opts
164     self._instance = instance
165     self._timeouts = timeouts
166     self._cbs = cbs
167     self._private = private
168
169     # Parent loop
170     self._loop = None
171
172     # Timestamps
173     self._ts_begin = None
174     self._ts_connected = None
175     self._ts_finished = None
176     self._ts_cleanup = None
177     self._ts_last_progress = None
178     self._ts_last_error = None
179
180     # Transfer status
181     self.success = None
182     self.final_message = None
183
184     # Daemon status
185     self._daemon_name = None
186     self._daemon = None
187
188   @property
189   def recent_output(self):
190     """Returns the most recent output from the daemon.
191
192     """
193     if self._daemon:
194       return self._daemon.recent_output
195
196     return None
197
198   @property
199   def progress(self):
200     """Returns transfer progress information.
201
202     """
203     if not self._daemon:
204       return None
205
206     return (self._daemon.progress_mbytes,
207             self._daemon.progress_throughput,
208             self._daemon.progress_percent,
209             self._daemon.progress_eta)
210
211   @property
212   def active(self):
213     """Determines whether this transport is still active.
214
215     """
216     return self.success is None
217
218   @property
219   def loop(self):
220     """Returns parent loop.
221
222     @rtype: L{ImportExportLoop}
223
224     """
225     return self._loop
226
227   def SetLoop(self, loop):
228     """Sets the parent loop.
229
230     @type loop: L{ImportExportLoop}
231
232     """
233     if self._loop:
234       raise errors.ProgrammerError("Loop can only be set once")
235
236     self._loop = loop
237
238   def _StartDaemon(self):
239     """Starts the import/export daemon.
240
241     """
242     raise NotImplementedError()
243
244   def CheckDaemon(self):
245     """Checks whether daemon has been started and if not, starts it.
246
247     @rtype: string
248     @return: Daemon name
249
250     """
251     assert self._ts_cleanup is None
252
253     if self._daemon_name is None:
254       assert self._ts_begin is None
255
256       result = self._StartDaemon()
257       if result.fail_msg:
258         raise _ImportExportError("Failed to start %s on %s: %s" %
259                                  (self.MODE_TEXT, self.node_name,
260                                   result.fail_msg))
261
262       daemon_name = result.payload
263
264       logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
265                    self.node_name)
266
267       self._ts_begin = time.time()
268       self._daemon_name = daemon_name
269
270     return self._daemon_name
271
272   def GetDaemonName(self):
273     """Returns the daemon name.
274
275     """
276     assert self._daemon_name, "Daemon has not been started"
277     assert self._ts_cleanup is None
278     return self._daemon_name
279
280   def Abort(self):
281     """Sends SIGTERM to import/export daemon (if still active).
282
283     """
284     if self._daemon_name:
285       self._lu.LogWarning("Aborting %s %r on %s",
286                           self.MODE_TEXT, self._daemon_name, self.node_name)
287       result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
288       if result.fail_msg:
289         self._lu.LogWarning("Failed to abort %s %r on %s: %s",
290                             self.MODE_TEXT, self._daemon_name,
291                             self.node_name, result.fail_msg)
292         return False
293
294     return True
295
296   def _SetDaemonData(self, data):
297     """Internal function for updating status daemon data.
298
299     @type data: L{objects.ImportExportStatus}
300     @param data: Daemon status data
301
302     """
303     assert self._ts_begin is not None
304
305     if not data:
306       if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
307         raise _ImportExportError("Didn't become ready after %s seconds" %
308                                  self._timeouts.ready)
309
310       return False
311
312     self._daemon = data
313
314     return True
315
316   def SetDaemonData(self, success, data):
317     """Updates daemon status data.
318
319     @type success: bool
320     @param success: Whether fetching data was successful or not
321     @type data: L{objects.ImportExportStatus}
322     @param data: Daemon status data
323
324     """
325     if not success:
326       if self._ts_last_error is None:
327         self._ts_last_error = time.time()
328
329       elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
330         raise _ImportExportError("Too many errors while updating data")
331
332       return False
333
334     self._ts_last_error = None
335
336     return self._SetDaemonData(data)
337
338   def CheckListening(self):
339     """Checks whether the daemon is listening.
340
341     """
342     raise NotImplementedError()
343
344   def _GetConnectedCheckEpoch(self):
345     """Returns timeout to calculate connect timeout.
346
347     """
348     raise NotImplementedError()
349
350   def CheckConnected(self):
351     """Checks whether the daemon is connected.
352
353     @rtype: bool
354     @return: Whether the daemon is connected
355
356     """
357     assert self._daemon, "Daemon status missing"
358
359     if self._ts_connected is not None:
360       return True
361
362     if self._daemon.connected:
363       self._ts_connected = time.time()
364
365       # TODO: Log remote peer
366       logging.debug("%s %r on %s is now connected",
367                     self.MODE_TEXT, self._daemon_name, self.node_name)
368
369       self._cbs.ReportConnected(self, self._private)
370
371       return True
372
373     if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
374       raise _ImportExportError("Not connected after %s seconds" %
375                                self._timeouts.connect)
376
377     return False
378
379   def _CheckProgress(self):
380     """Checks whether a progress update should be reported.
381
382     """
383     if ((self._ts_last_progress is None or
384          _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
385         self._daemon and
386         self._daemon.progress_mbytes is not None and
387         self._daemon.progress_throughput is not None):
388       self._cbs.ReportProgress(self, self._private)
389       self._ts_last_progress = time.time()
390
391   def CheckFinished(self):
392     """Checks whether the daemon exited.
393
394     @rtype: bool
395     @return: Whether the transfer is finished
396
397     """
398     assert self._daemon, "Daemon status missing"
399
400     if self._ts_finished:
401       return True
402
403     if self._daemon.exit_status is None:
404       # TODO: Adjust delay for ETA expiring soon
405       self._CheckProgress()
406       return False
407
408     self._ts_finished = time.time()
409
410     self._ReportFinished(self._daemon.exit_status == 0,
411                          self._daemon.error_message)
412
413     return True
414
415   def _ReportFinished(self, success, message):
416     """Transfer is finished or daemon exited.
417
418     @type success: bool
419     @param success: Whether the transfer was successful
420     @type message: string
421     @param message: Error message
422
423     """
424     assert self.success is None
425
426     self.success = success
427     self.final_message = message
428
429     if success:
430       logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
431                    self.node_name)
432     elif self._daemon_name:
433       self._lu.LogWarning("%s %r on %s failed: %s",
434                           self.MODE_TEXT, self._daemon_name, self.node_name,
435                           message)
436     else:
437       self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
438                           self.node_name, message)
439
440     self._cbs.ReportFinished(self, self._private)
441
442   def _Finalize(self):
443     """Makes the RPC call to finalize this import/export.
444
445     """
446     return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
447
448   def Finalize(self, error=None):
449     """Finalizes this import/export.
450
451     """
452     if self._daemon_name:
453       logging.info("Finalizing %s %r on %s",
454                    self.MODE_TEXT, self._daemon_name, self.node_name)
455
456       result = self._Finalize()
457       if result.fail_msg:
458         self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
459                             self.MODE_TEXT, self._daemon_name,
460                             self.node_name, result.fail_msg)
461         return False
462
463       # Daemon is no longer running
464       self._daemon_name = None
465       self._ts_cleanup = time.time()
466
467     if error:
468       self._ReportFinished(False, error)
469
470     return True
471
472
473 class DiskImport(_DiskImportExportBase):
474   MODE_TEXT = "import"
475
476   def __init__(self, lu, node_name, opts, instance,
477                dest, dest_args, timeouts, cbs, private=None):
478     """Initializes this class.
479
480     @param lu: Logical unit instance
481     @type node_name: string
482     @param node_name: Node name for import
483     @type opts: L{objects.ImportExportOptions}
484     @param opts: Import/export daemon options
485     @type instance: L{objects.Instance}
486     @param instance: Instance object
487     @param dest: I/O destination
488     @param dest_args: I/O arguments
489     @type timeouts: L{ImportExportTimeouts}
490     @param timeouts: Timeouts for this import
491     @type cbs: L{ImportExportCbBase}
492     @param cbs: Callbacks
493     @param private: Private data for callback functions
494
495     """
496     _DiskImportExportBase.__init__(self, lu, node_name, opts,
497                                    instance, timeouts, cbs, private)
498     self._dest = dest
499     self._dest_args = dest_args
500
501     # Timestamps
502     self._ts_listening = None
503
504   @property
505   def listen_port(self):
506     """Returns the port the daemon is listening on.
507
508     """
509     if self._daemon:
510       return self._daemon.listen_port
511
512     return None
513
514   def _StartDaemon(self):
515     """Starts the import daemon.
516
517     """
518     return self._lu.rpc.call_import_start(self.node_name, self._opts,
519                                           self._instance,
520                                           self._dest, self._dest_args)
521
522   def CheckListening(self):
523     """Checks whether the daemon is listening.
524
525     @rtype: bool
526     @return: Whether the daemon is listening
527
528     """
529     assert self._daemon, "Daemon status missing"
530
531     if self._ts_listening is not None:
532       return True
533
534     port = self._daemon.listen_port
535     if port is not None:
536       self._ts_listening = time.time()
537
538       logging.debug("Import %r on %s is now listening on port %s",
539                     self._daemon_name, self.node_name, port)
540
541       self._cbs.ReportListening(self, self._private)
542
543       return True
544
545     if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
546       raise _ImportExportError("Not listening after %s seconds" %
547                                self._timeouts.listen)
548
549     return False
550
551   def _GetConnectedCheckEpoch(self):
552     """Returns the time since we started listening.
553
554     """
555     assert self._ts_listening is not None, \
556            ("Checking whether an import is connected is only useful"
557             " once it's been listening")
558
559     return self._ts_listening
560
561
562 class DiskExport(_DiskImportExportBase):
563   MODE_TEXT = "export"
564
565   def __init__(self, lu, node_name, opts,
566                dest_host, dest_port, instance, source, source_args,
567                timeouts, cbs, private=None):
568     """Initializes this class.
569
570     @param lu: Logical unit instance
571     @type node_name: string
572     @param node_name: Node name for import
573     @type opts: L{objects.ImportExportOptions}
574     @param opts: Import/export daemon options
575     @type dest_host: string
576     @param dest_host: Destination host name or IP address
577     @type dest_port: number
578     @param dest_port: Destination port number
579     @type instance: L{objects.Instance}
580     @param instance: Instance object
581     @param source: I/O source
582     @param source_args: I/O source
583     @type timeouts: L{ImportExportTimeouts}
584     @param timeouts: Timeouts for this import
585     @type cbs: L{ImportExportCbBase}
586     @param cbs: Callbacks
587     @param private: Private data for callback functions
588
589     """
590     _DiskImportExportBase.__init__(self, lu, node_name, opts,
591                                    instance, timeouts, cbs, private)
592     self._dest_host = dest_host
593     self._dest_port = dest_port
594     self._source = source
595     self._source_args = source_args
596
597   def _StartDaemon(self):
598     """Starts the export daemon.
599
600     """
601     return self._lu.rpc.call_export_start(self.node_name, self._opts,
602                                           self._dest_host, self._dest_port,
603                                           self._instance, self._source,
604                                           self._source_args)
605
606   def CheckListening(self):
607     """Checks whether the daemon is listening.
608
609     """
610     # Only an import can be listening
611     return True
612
613   def _GetConnectedCheckEpoch(self):
614     """Returns the time since the daemon started.
615
616     """
617     assert self._ts_begin is not None
618
619     return self._ts_begin
620
621
622 def FormatProgress(progress):
623   """Formats progress information for user consumption
624
625   """
626   (mbytes, throughput, percent, _) = progress
627
628   parts = [
629     utils.FormatUnit(mbytes, "h"),
630
631     # Not using FormatUnit as it doesn't support kilobytes
632     "%0.1f MiB/s" % throughput,
633     ]
634
635   if percent is not None:
636     parts.append("%d%%" % percent)
637
638   # TODO: Format ETA
639
640   return utils.CommaJoin(parts)
641
642
643 class ImportExportLoop:
644   MIN_DELAY = 1.0
645   MAX_DELAY = 20.0
646
647   def __init__(self, lu):
648     """Initializes this class.
649
650     """
651     self._lu = lu
652     self._queue = []
653     self._pending_add = []
654
655   def Add(self, diskie):
656     """Adds an import/export object to the loop.
657
658     @type diskie: Subclass of L{_DiskImportExportBase}
659     @param diskie: Import/export object
660
661     """
662     assert diskie not in self._pending_add
663     assert diskie.loop is None
664
665     diskie.SetLoop(self)
666
667     # Adding new objects to a staging list is necessary, otherwise the main
668     # loop gets confused if callbacks modify the queue while the main loop is
669     # iterating over it.
670     self._pending_add.append(diskie)
671
672   @staticmethod
673   def _CollectDaemonStatus(lu, daemons):
674     """Collects the status for all import/export daemons.
675
676     """
677     daemon_status = {}
678
679     for node_name, names in daemons.iteritems():
680       result = lu.rpc.call_impexp_status(node_name, names)
681       if result.fail_msg:
682         lu.LogWarning("Failed to get daemon status on %s: %s",
683                       node_name, result.fail_msg)
684         continue
685
686       assert len(names) == len(result.payload)
687
688       daemon_status[node_name] = dict(zip(names, result.payload))
689
690     return daemon_status
691
692   @staticmethod
693   def _GetActiveDaemonNames(queue):
694     """Gets the names of all active daemons.
695
696     """
697     result = {}
698     for diskie in queue:
699       if not diskie.active:
700         continue
701
702       try:
703         # Start daemon if necessary
704         daemon_name = diskie.CheckDaemon()
705       except _ImportExportError, err:
706         logging.exception("%s failed", diskie.MODE_TEXT)
707         diskie.Finalize(error=str(err))
708         continue
709
710       result.setdefault(diskie.node_name, []).append(daemon_name)
711
712     assert len(queue) >= len(result)
713     assert len(queue) >= sum([len(names) for names in result.itervalues()])
714
715     logging.debug("daemons=%r", result)
716
717     return result
718
719   def _AddPendingToQueue(self):
720     """Adds all pending import/export objects to the internal queue.
721
722     """
723     assert compat.all(diskie not in self._queue and diskie.loop == self
724                       for diskie in self._pending_add)
725
726     self._queue.extend(self._pending_add)
727
728     del self._pending_add[:]
729
730   def Run(self):
731     """Utility main loop.
732
733     """
734     while True:
735       self._AddPendingToQueue()
736
737       # Collect all active daemon names
738       daemons = self._GetActiveDaemonNames(self._queue)
739       if not daemons:
740         break
741
742       # Collection daemon status data
743       data = self._CollectDaemonStatus(self._lu, daemons)
744
745       # Use data
746       delay = self.MAX_DELAY
747       for diskie in self._queue:
748         if not diskie.active:
749           continue
750
751         try:
752           try:
753             all_daemon_data = data[diskie.node_name]
754           except KeyError:
755             result = diskie.SetDaemonData(False, None)
756           else:
757             result = \
758               diskie.SetDaemonData(True,
759                                    all_daemon_data[diskie.GetDaemonName()])
760
761           if not result:
762             # Daemon not yet ready, retry soon
763             delay = min(3.0, delay)
764             continue
765
766           if diskie.CheckFinished():
767             # Transfer finished
768             diskie.Finalize()
769             continue
770
771           # Normal case: check again in 5 seconds
772           delay = min(5.0, delay)
773
774           if not diskie.CheckListening():
775             # Not yet listening, retry soon
776             delay = min(1.0, delay)
777             continue
778
779           if not diskie.CheckConnected():
780             # Not yet connected, retry soon
781             delay = min(1.0, delay)
782             continue
783
784         except _ImportExportError, err:
785           logging.exception("%s failed", diskie.MODE_TEXT)
786           diskie.Finalize(error=str(err))
787
788       if not compat.any([diskie.active for diskie in self._queue]):
789         break
790
791       # Wait a bit
792       delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
793       logging.debug("Waiting for %ss", delay)
794       time.sleep(delay)
795
796   def FinalizeAll(self):
797     """Finalizes all pending transfers.
798
799     """
800     success = True
801
802     for diskie in self._queue:
803       success = diskie.Finalize() and success
804
805     return success
806
807
808 class _TransferInstCbBase(ImportExportCbBase):
809   def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
810                dest_node, dest_ip, export_opts):
811     """Initializes this class.
812
813     """
814     ImportExportCbBase.__init__(self)
815
816     self.lu = lu
817     self.feedback_fn = feedback_fn
818     self.instance = instance
819     self.timeouts = timeouts
820     self.src_node = src_node
821     self.src_cbs = src_cbs
822     self.dest_node = dest_node
823     self.dest_ip = dest_ip
824     self.export_opts = export_opts
825
826
827 class _TransferInstSourceCb(_TransferInstCbBase):
828   def ReportConnected(self, ie, dtp):
829     """Called when a connection has been established.
830
831     """
832     assert self.src_cbs is None
833     assert dtp.src_export == ie
834     assert dtp.dest_import
835
836     self.feedback_fn("%s is sending data on %s" %
837                      (dtp.data.name, ie.node_name))
838
839   def ReportProgress(self, ie, dtp):
840     """Called when new progress information should be reported.
841
842     """
843     progress = ie.progress
844     if not progress:
845       return
846
847     self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
848
849   def ReportFinished(self, ie, dtp):
850     """Called when a transfer has finished.
851
852     """
853     assert self.src_cbs is None
854     assert dtp.src_export == ie
855     assert dtp.dest_import
856
857     if ie.success:
858       self.feedback_fn("%s finished sending data" % dtp.data.name)
859     else:
860       self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
861                        (dtp.data.name, ie.final_message, ie.recent_output))
862
863     dtp.RecordResult(ie.success)
864
865     cb = dtp.data.finished_fn
866     if cb:
867       cb()
868
869     # TODO: Check whether sending SIGTERM right away is okay, maybe we should
870     # give the daemon a moment to sort things out
871     if dtp.dest_import and not ie.success:
872       dtp.dest_import.Abort()
873
874
875 class _TransferInstDestCb(_TransferInstCbBase):
876   def ReportListening(self, ie, dtp):
877     """Called when daemon started listening.
878
879     """
880     assert self.src_cbs
881     assert dtp.src_export is None
882     assert dtp.dest_import
883
884     self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
885
886     # Start export on source node
887     de = DiskExport(self.lu, self.src_node, self.export_opts,
888                     self.dest_ip, ie.listen_port,
889                     self.instance, dtp.data.src_io, dtp.data.src_ioargs,
890                     self.timeouts, self.src_cbs, private=dtp)
891     ie.loop.Add(de)
892
893     dtp.src_export = de
894
895   def ReportConnected(self, ie, dtp):
896     """Called when a connection has been established.
897
898     """
899     self.feedback_fn("%s is receiving data on %s" %
900                      (dtp.data.name, self.dest_node))
901
902   def ReportFinished(self, ie, dtp):
903     """Called when a transfer has finished.
904
905     """
906     if ie.success:
907       self.feedback_fn("%s finished receiving data" % dtp.data.name)
908     else:
909       self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
910                        (dtp.data.name, ie.final_message, ie.recent_output))
911
912     dtp.RecordResult(ie.success)
913
914     # TODO: Check whether sending SIGTERM right away is okay, maybe we should
915     # give the daemon a moment to sort things out
916     if dtp.src_export and not ie.success:
917       dtp.src_export.Abort()
918
919
920 class DiskTransfer(object):
921   def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
922                finished_fn):
923     """Initializes this class.
924
925     @type name: string
926     @param name: User-visible name for this transfer (e.g. "disk/0")
927     @param src_io: Source I/O type
928     @param src_ioargs: Source I/O arguments
929     @param dest_io: Destination I/O type
930     @param dest_ioargs: Destination I/O arguments
931     @type finished_fn: callable
932     @param finished_fn: Function called once transfer has finished
933
934     """
935     self.name = name
936
937     self.src_io = src_io
938     self.src_ioargs = src_ioargs
939
940     self.dest_io = dest_io
941     self.dest_ioargs = dest_ioargs
942
943     self.finished_fn = finished_fn
944
945
946 class _DiskTransferPrivate(object):
947   def __init__(self, data, success):
948     """Initializes this class.
949
950     @type data: L{DiskTransfer}
951     @type success: bool
952
953     """
954     self.data = data
955
956     self.src_export = None
957     self.dest_import = None
958
959     self.success = success
960
961   def RecordResult(self, success):
962     """Updates the status.
963
964     One failed part will cause the whole transfer to fail.
965
966     """
967     self.success = self.success and success
968
969
970 def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
971                          instance, all_transfers):
972   """Transfers an instance's data from one node to another.
973
974   @param lu: Logical unit instance
975   @param feedback_fn: Feedback function
976   @type src_node: string
977   @param src_node: Source node name
978   @type dest_node: string
979   @param dest_node: Destination node name
980   @type dest_ip: string
981   @param dest_ip: IP address of destination node
982   @type instance: L{objects.Instance}
983   @param instance: Instance object
984   @type all_transfers: list of L{DiskTransfer} instances
985   @param all_transfers: List of all disk transfers to be made
986   @rtype: list
987   @return: List with a boolean (True=successful, False=failed) for success for
988            each transfer
989
990   """
991   # Compress only if transfer is to another node
992   if src_node == dest_node:
993     compress = constants.IEC_NONE
994   else:
995     compress = constants.IEC_GZIP
996
997   logging.debug("Source node %s, destination node %s, compression '%s'",
998                 src_node, dest_node, compress)
999
1000   opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1001                                      compress=compress)
1002
1003   timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1004   src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1005                                   src_node, None, dest_node, dest_ip, opts)
1006   dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1007                                  src_node, src_cbs, dest_node, dest_ip, opts)
1008
1009   all_dtp = []
1010
1011   ieloop = ImportExportLoop(lu)
1012   try:
1013     for transfer in all_transfers:
1014       if transfer:
1015         feedback_fn("Exporting %s from %s to %s" %
1016                     (transfer.name, src_node, dest_node))
1017
1018         dtp = _DiskTransferPrivate(transfer, True)
1019
1020         di = DiskImport(lu, dest_node, opts, instance,
1021                         transfer.dest_io, transfer.dest_ioargs,
1022                         timeouts, dest_cbs, private=dtp)
1023         ieloop.Add(di)
1024
1025         dtp.dest_import = di
1026       else:
1027         dtp = _DiskTransferPrivate(None, False)
1028
1029       all_dtp.append(dtp)
1030
1031     ieloop.Run()
1032   finally:
1033     ieloop.FinalizeAll()
1034
1035   assert len(all_dtp) == len(all_transfers)
1036   assert compat.all([(dtp.src_export is None or
1037                       dtp.src_export.success is not None) and
1038                      (dtp.dest_import is None or
1039                       dtp.dest_import.success is not None)
1040                      for dtp in all_dtp]), \
1041          "Not all imports/exports are finalized"
1042
1043   return [bool(dtp.success) for dtp in all_dtp]
1044
1045
1046 class _RemoteExportCb(ImportExportCbBase):
1047   def __init__(self, feedback_fn, disk_count):
1048     """Initializes this class.
1049
1050     """
1051     ImportExportCbBase.__init__(self)
1052     self._feedback_fn = feedback_fn
1053     self._dresults = [None] * disk_count
1054
1055   @property
1056   def disk_results(self):
1057     """Returns per-disk results.
1058
1059     """
1060     return self._dresults
1061
1062   def ReportConnected(self, ie, private):
1063     """Called when a connection has been established.
1064
1065     """
1066     (idx, _) = private
1067
1068     self._feedback_fn("Disk %s is now sending data" % idx)
1069
1070   def ReportProgress(self, ie, private):
1071     """Called when new progress information should be reported.
1072
1073     """
1074     (idx, _) = private
1075
1076     progress = ie.progress
1077     if not progress:
1078       return
1079
1080     self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1081
1082   def ReportFinished(self, ie, private):
1083     """Called when a transfer has finished.
1084
1085     """
1086     (idx, finished_fn) = private
1087
1088     if ie.success:
1089       self._feedback_fn("Disk %s finished sending data" % idx)
1090     else:
1091       self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" %
1092                         (idx, ie.final_message, ie.recent_output))
1093
1094     self._dresults[idx] = bool(ie.success)
1095
1096     if finished_fn:
1097       finished_fn()
1098
1099
1100 class ExportInstanceHelper:
1101   def __init__(self, lu, feedback_fn, instance):
1102     """Initializes this class.
1103
1104     @param lu: Logical unit instance
1105     @param feedback_fn: Feedback function
1106     @type instance: L{objects.Instance}
1107     @param instance: Instance object
1108
1109     """
1110     self._lu = lu
1111     self._feedback_fn = feedback_fn
1112     self._instance = instance
1113
1114     self._snap_disks = []
1115     self._removed_snaps = [False] * len(instance.disks)
1116
1117   def CreateSnapshots(self):
1118     """Creates an LVM snapshot for every disk of the instance.
1119
1120     """
1121     assert not self._snap_disks
1122
1123     instance = self._instance
1124     src_node = instance.primary_node
1125
1126     vgname = self._lu.cfg.GetVGName()
1127
1128     for idx, disk in enumerate(instance.disks):
1129       self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1130                         (idx, src_node))
1131
1132       # result.payload will be a snapshot of an lvm leaf of the one we
1133       # passed
1134       result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1135       msg = result.fail_msg
1136       if msg:
1137         self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1138                             idx, src_node, msg)
1139         new_dev = False
1140       else:
1141         disk_id = (vgname, result.payload)
1142         new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1143                                logical_id=disk_id, physical_id=disk_id,
1144                                iv_name=disk.iv_name)
1145
1146       self._snap_disks.append(new_dev)
1147
1148     assert len(self._snap_disks) == len(instance.disks)
1149     assert len(self._removed_snaps) == len(instance.disks)
1150
1151   def _RemoveSnapshot(self, disk_index):
1152     """Removes an LVM snapshot.
1153
1154     @type disk_index: number
1155     @param disk_index: Index of the snapshot to be removed
1156
1157     """
1158     disk = self._snap_disks[disk_index]
1159     if disk and not self._removed_snaps[disk_index]:
1160       src_node = self._instance.primary_node
1161
1162       self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1163                         (disk_index, src_node))
1164
1165       result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1166       if result.fail_msg:
1167         self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1168                             " %s: %s", disk_index, src_node, result.fail_msg)
1169       else:
1170         self._removed_snaps[disk_index] = True
1171
1172   def LocalExport(self, dest_node):
1173     """Intra-cluster instance export.
1174
1175     @type dest_node: L{objects.Node}
1176     @param dest_node: Destination node
1177
1178     """
1179     instance = self._instance
1180     src_node = instance.primary_node
1181
1182     assert len(self._snap_disks) == len(instance.disks)
1183
1184     transfers = []
1185
1186     for idx, dev in enumerate(self._snap_disks):
1187       if not dev:
1188         transfers.append(None)
1189         continue
1190
1191       path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1192                             dev.physical_id[1])
1193
1194       finished_fn = compat.partial(self._TransferFinished, idx)
1195
1196       # FIXME: pass debug option from opcode to backend
1197       dt = DiskTransfer("snapshot/%s" % idx,
1198                         constants.IEIO_SCRIPT, (dev, idx),
1199                         constants.IEIO_FILE, (path, ),
1200                         finished_fn)
1201       transfers.append(dt)
1202
1203     # Actually export data
1204     dresults = TransferInstanceData(self._lu, self._feedback_fn,
1205                                     src_node, dest_node.name,
1206                                     dest_node.secondary_ip,
1207                                     instance, transfers)
1208
1209     assert len(dresults) == len(instance.disks)
1210
1211     self._feedback_fn("Finalizing export on %s" % dest_node.name)
1212     result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1213                                                self._snap_disks)
1214     msg = result.fail_msg
1215     fin_resu = not msg
1216     if msg:
1217       self._lu.LogWarning("Could not finalize export for instance %s"
1218                           " on node %s: %s", instance.name, dest_node.name, msg)
1219
1220     return (fin_resu, dresults)
1221
1222   def RemoteExport(self, opts, disk_info, timeouts):
1223     """Inter-cluster instance export.
1224
1225     @type opts: L{objects.ImportExportOptions}
1226     @param opts: Import/export daemon options
1227     @type disk_info: list
1228     @param disk_info: Per-disk destination information
1229     @type timeouts: L{ImportExportTimeouts}
1230     @param timeouts: Timeouts for this import
1231
1232     """
1233     instance = self._instance
1234
1235     assert len(disk_info) == len(instance.disks)
1236
1237     cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1238
1239     ieloop = ImportExportLoop(self._lu)
1240     try:
1241       for idx, (dev, (host, port, _, _)) in enumerate(zip(instance.disks,
1242                                                           disk_info)):
1243         self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1244         finished_fn = compat.partial(self._TransferFinished, idx)
1245         ieloop.Add(DiskExport(self._lu, instance.primary_node,
1246                               opts, host, port, instance,
1247                               constants.IEIO_SCRIPT, (dev, idx),
1248                               timeouts, cbs, private=(idx, finished_fn)))
1249
1250       ieloop.Run()
1251     finally:
1252       ieloop.FinalizeAll()
1253
1254     return (True, cbs.disk_results)
1255
1256   def _TransferFinished(self, idx):
1257     """Called once a transfer has finished.
1258
1259     @type idx: number
1260     @param idx: Disk index
1261
1262     """
1263     logging.debug("Transfer %s finished", idx)
1264     self._RemoveSnapshot(idx)
1265
1266   def Cleanup(self):
1267     """Remove all snapshots.
1268
1269     """
1270     assert len(self._removed_snaps) == len(self._instance.disks)
1271     for idx in range(len(self._instance.disks)):
1272       self._RemoveSnapshot(idx)
1273
1274
1275 class _RemoteImportCb(ImportExportCbBase):
1276   def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1277                external_address):
1278     """Initializes this class.
1279
1280     @type cds: string
1281     @param cds: Cluster domain secret
1282     @type x509_cert_pem: string
1283     @param x509_cert_pem: CA used for signing import key
1284     @type disk_count: number
1285     @param disk_count: Number of disks
1286     @type external_address: string
1287     @param external_address: External address of destination node
1288
1289     """
1290     ImportExportCbBase.__init__(self)
1291     self._feedback_fn = feedback_fn
1292     self._cds = cds
1293     self._x509_cert_pem = x509_cert_pem
1294     self._disk_count = disk_count
1295     self._external_address = external_address
1296
1297     self._dresults = [None] * disk_count
1298     self._daemon_port = [None] * disk_count
1299
1300     self._salt = utils.GenerateSecret(8)
1301
1302   @property
1303   def disk_results(self):
1304     """Returns per-disk results.
1305
1306     """
1307     return self._dresults
1308
1309   def _CheckAllListening(self):
1310     """Checks whether all daemons are listening.
1311
1312     If all daemons are listening, the information is sent to the client.
1313
1314     """
1315     if not compat.all(dp is not None for dp in self._daemon_port):
1316       return
1317
1318     host = self._external_address
1319
1320     disks = []
1321     for idx, port in enumerate(self._daemon_port):
1322       disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1323                                                idx, host, port))
1324
1325     assert len(disks) == self._disk_count
1326
1327     self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1328       "disks": disks,
1329       "x509_ca": self._x509_cert_pem,
1330       })
1331
1332   def ReportListening(self, ie, private):
1333     """Called when daemon started listening.
1334
1335     """
1336     (idx, ) = private
1337
1338     self._feedback_fn("Disk %s is now listening" % idx)
1339
1340     assert self._daemon_port[idx] is None
1341
1342     self._daemon_port[idx] = ie.listen_port
1343
1344     self._CheckAllListening()
1345
1346   def ReportConnected(self, ie, private):
1347     """Called when a connection has been established.
1348
1349     """
1350     (idx, ) = private
1351
1352     self._feedback_fn("Disk %s is now receiving data" % idx)
1353
1354   def ReportFinished(self, ie, private):
1355     """Called when a transfer has finished.
1356
1357     """
1358     (idx, ) = private
1359
1360     # Daemon is certainly no longer listening
1361     self._daemon_port[idx] = None
1362
1363     if ie.success:
1364       self._feedback_fn("Disk %s finished receiving data" % idx)
1365     else:
1366       self._feedback_fn(("Disk %s failed to receive data: %s"
1367                          " (recent output: %r)") %
1368                         (idx, ie.final_message, ie.recent_output))
1369
1370     self._dresults[idx] = bool(ie.success)
1371
1372
1373 def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts):
1374   """Imports an instance from another cluster.
1375
1376   @param lu: Logical unit instance
1377   @param feedback_fn: Feedback function
1378   @type instance: L{objects.Instance}
1379   @param instance: Instance object
1380   @type source_x509_ca: OpenSSL.crypto.X509
1381   @param source_x509_ca: Import source's X509 CA
1382   @type cds: string
1383   @param cds: Cluster domain secret
1384   @type timeouts: L{ImportExportTimeouts}
1385   @param timeouts: Timeouts for this import
1386
1387   """
1388   source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1389                                                   source_x509_ca)
1390
1391   # Create crypto key
1392   result = lu.rpc.call_x509_cert_create(instance.primary_node,
1393                                         constants.RIE_CERT_VALIDITY)
1394   result.Raise("Can't create X509 key and certificate on %s" % result.node)
1395
1396   (x509_key_name, x509_cert_pem) = result.payload
1397   try:
1398     # Load certificate
1399     x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1400                                                 x509_cert_pem)
1401
1402     # Import daemon options
1403     opts = objects.ImportExportOptions(key_name=x509_key_name,
1404                                        ca_pem=source_ca_pem)
1405
1406     # Sign certificate
1407     signed_x509_cert_pem = \
1408       utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1409
1410     cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1411                           len(instance.disks), instance.primary_node)
1412
1413     ieloop = ImportExportLoop(lu)
1414     try:
1415       for idx, dev in enumerate(instance.disks):
1416         ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1417                               constants.IEIO_SCRIPT, (dev, idx),
1418                               timeouts, cbs, private=(idx, )))
1419
1420       ieloop.Run()
1421     finally:
1422       ieloop.FinalizeAll()
1423   finally:
1424     # Remove crypto key and certificate
1425     result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1426     result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1427
1428   return cbs.disk_results
1429
1430
1431 def _GetImportExportHandshakeMessage(version):
1432   """Returns the handshake message for a RIE protocol version.
1433
1434   @type version: number
1435
1436   """
1437   return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1438
1439
1440 def ComputeRemoteExportHandshake(cds):
1441   """Computes the remote import/export handshake.
1442
1443   @type cds: string
1444   @param cds: Cluster domain secret
1445
1446   """
1447   salt = utils.GenerateSecret(8)
1448   msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1449   return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1450
1451
1452 def CheckRemoteExportHandshake(cds, handshake):
1453   """Checks the handshake of a remote import/export.
1454
1455   @type cds: string
1456   @param cds: Cluster domain secret
1457   @type handshake: sequence
1458   @param handshake: Handshake sent by remote peer
1459
1460   """
1461   try:
1462     (version, hmac_digest, hmac_salt) = handshake
1463   except (TypeError, ValueError), err:
1464     return "Invalid data: %s" % err
1465
1466   if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1467                               hmac_digest, salt=hmac_salt):
1468     return "Hash didn't match, clusters don't share the same domain secret"
1469
1470   if version != constants.RIE_VERSION:
1471     return ("Clusters don't have the same remote import/export protocol"
1472             " (local=%s, remote=%s)" %
1473             (constants.RIE_VERSION, version))
1474
1475   return None
1476
1477
1478 def _GetRieDiskInfoMessage(disk_index, host, port):
1479   """Returns the hashed text for import/export disk information.
1480
1481   @type disk_index: number
1482   @param disk_index: Index of disk (included in hash)
1483   @type host: string
1484   @param host: Hostname
1485   @type port: number
1486   @param port: Daemon port
1487
1488   """
1489   return "%s:%s:%s" % (disk_index, host, port)
1490
1491
1492 def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1493   """Verifies received disk information for an export.
1494
1495   @type cds: string
1496   @param cds: Cluster domain secret
1497   @type disk_index: number
1498   @param disk_index: Index of disk (included in hash)
1499   @type disk_info: sequence
1500   @param disk_info: Disk information sent by remote peer
1501
1502   """
1503   try:
1504     (host, port, hmac_digest, hmac_salt) = disk_info
1505   except (TypeError, ValueError), err:
1506     raise errors.GenericError("Invalid data: %s" % err)
1507
1508   if not (host and port):
1509     raise errors.GenericError("Missing destination host or port")
1510
1511   msg = _GetRieDiskInfoMessage(disk_index, host, port)
1512
1513   if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1514     raise errors.GenericError("HMAC is wrong")
1515
1516   return (host, port)
1517
1518
1519 def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port):
1520   """Computes the signed disk information for a remote import.
1521
1522   @type cds: string
1523   @param cds: Cluster domain secret
1524   @type salt: string
1525   @param salt: HMAC salt
1526   @type disk_index: number
1527   @param disk_index: Index of disk (included in hash)
1528   @type host: string
1529   @param host: Hostname
1530   @type port: number
1531   @param port: Daemon port
1532
1533   """
1534   msg = _GetRieDiskInfoMessage(disk_index, host, port)
1535   hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1536   return (host, port, hmac_digest, salt)