Fix exit code of “gnt-cluster verify”
[ganeti-local] / lib / masterd / instance.py
1 #
2 #
3
4 # Copyright (C) 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Instance-related functions and classes for masterd.
23
24 """
25
26 import logging
27 import time
28 import OpenSSL
29
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import compat
33 from ganeti import utils
34 from ganeti import objects
35 from ganeti import netutils
36
37
38 class _ImportExportError(Exception):
39   """Local exception to report import/export errors.
40
41   """
42
43
44 class ImportExportTimeouts(object):
45   #: Time until daemon starts writing status file
46   DEFAULT_READY_TIMEOUT = 10
47
48   #: Length of time until errors cause hard failure
49   DEFAULT_ERROR_TIMEOUT = 10
50
51   #: Time after which daemon must be listening
52   DEFAULT_LISTEN_TIMEOUT = 10
53
54   #: Progress update interval
55   DEFAULT_PROGRESS_INTERVAL = 60
56
57   __slots__ = [
58     "error",
59     "ready",
60     "listen",
61     "connect",
62     "progress",
63     ]
64
65   def __init__(self, connect,
66                listen=DEFAULT_LISTEN_TIMEOUT,
67                error=DEFAULT_ERROR_TIMEOUT,
68                ready=DEFAULT_READY_TIMEOUT,
69                progress=DEFAULT_PROGRESS_INTERVAL):
70     """Initializes this class.
71
72     @type connect: number
73     @param connect: Timeout for establishing connection
74     @type listen: number
75     @param listen: Timeout for starting to listen for connections
76     @type error: number
77     @param error: Length of time until errors cause hard failure
78     @type ready: number
79     @param ready: Timeout for daemon to become ready
80     @type progress: number
81     @param progress: Progress update interval
82
83     """
84     self.error = error
85     self.ready = ready
86     self.listen = listen
87     self.connect = connect
88     self.progress = progress
89
90
91 class ImportExportCbBase(object):
92   """Callbacks for disk import/export.
93
94   """
95   def ReportListening(self, ie, private, component):
96     """Called when daemon started listening.
97
98     @type ie: Subclass of L{_DiskImportExportBase}
99     @param ie: Import/export object
100     @param private: Private data passed to import/export object
101     @param component: transfer component name
102
103     """
104
105   def ReportConnected(self, ie, private):
106     """Called when a connection has been established.
107
108     @type ie: Subclass of L{_DiskImportExportBase}
109     @param ie: Import/export object
110     @param private: Private data passed to import/export object
111
112     """
113
114   def ReportProgress(self, ie, private):
115     """Called when new progress information should be reported.
116
117     @type ie: Subclass of L{_DiskImportExportBase}
118     @param ie: Import/export object
119     @param private: Private data passed to import/export object
120
121     """
122
123   def ReportFinished(self, ie, private):
124     """Called when a transfer has finished.
125
126     @type ie: Subclass of L{_DiskImportExportBase}
127     @param ie: Import/export object
128     @param private: Private data passed to import/export object
129
130     """
131
132
133 def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
134   """Checks whether a timeout has expired.
135
136   """
137   return _time_fn() > (epoch + timeout)
138
139
140 class _DiskImportExportBase(object):
141   MODE_TEXT = None
142
143   def __init__(self, lu, node_name, opts,
144                instance, component, timeouts, cbs, private=None):
145     """Initializes this class.
146
147     @param lu: Logical unit instance
148     @type node_name: string
149     @param node_name: Node name for import
150     @type opts: L{objects.ImportExportOptions}
151     @param opts: Import/export daemon options
152     @type instance: L{objects.Instance}
153     @param instance: Instance object
154     @type component: string
155     @param component: which part of the instance is being imported
156     @type timeouts: L{ImportExportTimeouts}
157     @param timeouts: Timeouts for this import
158     @type cbs: L{ImportExportCbBase}
159     @param cbs: Callbacks
160     @param private: Private data for callback functions
161
162     """
163     assert self.MODE_TEXT
164
165     self._lu = lu
166     self.node_name = node_name
167     self._opts = opts.Copy()
168     self._instance = instance
169     self._component = component
170     self._timeouts = timeouts
171     self._cbs = cbs
172     self._private = private
173
174     # Set master daemon's timeout in options for import/export daemon
175     assert self._opts.connect_timeout is None
176     self._opts.connect_timeout = timeouts.connect
177
178     # Parent loop
179     self._loop = None
180
181     # Timestamps
182     self._ts_begin = None
183     self._ts_connected = None
184     self._ts_finished = None
185     self._ts_cleanup = None
186     self._ts_last_progress = None
187     self._ts_last_error = None
188
189     # Transfer status
190     self.success = None
191     self.final_message = None
192
193     # Daemon status
194     self._daemon_name = None
195     self._daemon = None
196
197   @property
198   def recent_output(self):
199     """Returns the most recent output from the daemon.
200
201     """
202     if self._daemon:
203       return "\n".join(self._daemon.recent_output)
204
205     return None
206
207   @property
208   def progress(self):
209     """Returns transfer progress information.
210
211     """
212     if not self._daemon:
213       return None
214
215     return (self._daemon.progress_mbytes,
216             self._daemon.progress_throughput,
217             self._daemon.progress_percent,
218             self._daemon.progress_eta)
219
220   @property
221   def magic(self):
222     """Returns the magic value for this import/export.
223
224     """
225     return self._opts.magic
226
227   @property
228   def active(self):
229     """Determines whether this transport is still active.
230
231     """
232     return self.success is None
233
234   @property
235   def loop(self):
236     """Returns parent loop.
237
238     @rtype: L{ImportExportLoop}
239
240     """
241     return self._loop
242
243   def SetLoop(self, loop):
244     """Sets the parent loop.
245
246     @type loop: L{ImportExportLoop}
247
248     """
249     if self._loop:
250       raise errors.ProgrammerError("Loop can only be set once")
251
252     self._loop = loop
253
254   def _StartDaemon(self):
255     """Starts the import/export daemon.
256
257     """
258     raise NotImplementedError()
259
260   def CheckDaemon(self):
261     """Checks whether daemon has been started and if not, starts it.
262
263     @rtype: string
264     @return: Daemon name
265
266     """
267     assert self._ts_cleanup is None
268
269     if self._daemon_name is None:
270       assert self._ts_begin is None
271
272       result = self._StartDaemon()
273       if result.fail_msg:
274         raise _ImportExportError("Failed to start %s on %s: %s" %
275                                  (self.MODE_TEXT, self.node_name,
276                                   result.fail_msg))
277
278       daemon_name = result.payload
279
280       logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
281                    self.node_name)
282
283       self._ts_begin = time.time()
284       self._daemon_name = daemon_name
285
286     return self._daemon_name
287
288   def GetDaemonName(self):
289     """Returns the daemon name.
290
291     """
292     assert self._daemon_name, "Daemon has not been started"
293     assert self._ts_cleanup is None
294     return self._daemon_name
295
296   def Abort(self):
297     """Sends SIGTERM to import/export daemon (if still active).
298
299     """
300     if self._daemon_name:
301       self._lu.LogWarning("Aborting %s '%s' on %s",
302                           self.MODE_TEXT, self._daemon_name, self.node_name)
303       result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
304       if result.fail_msg:
305         self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
306                             self.MODE_TEXT, self._daemon_name,
307                             self.node_name, result.fail_msg)
308         return False
309
310     return True
311
312   def _SetDaemonData(self, data):
313     """Internal function for updating status daemon data.
314
315     @type data: L{objects.ImportExportStatus}
316     @param data: Daemon status data
317
318     """
319     assert self._ts_begin is not None
320
321     if not data:
322       if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
323         raise _ImportExportError("Didn't become ready after %s seconds" %
324                                  self._timeouts.ready)
325
326       return False
327
328     self._daemon = data
329
330     return True
331
332   def SetDaemonData(self, success, data):
333     """Updates daemon status data.
334
335     @type success: bool
336     @param success: Whether fetching data was successful or not
337     @type data: L{objects.ImportExportStatus}
338     @param data: Daemon status data
339
340     """
341     if not success:
342       if self._ts_last_error is None:
343         self._ts_last_error = time.time()
344
345       elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
346         raise _ImportExportError("Too many errors while updating data")
347
348       return False
349
350     self._ts_last_error = None
351
352     return self._SetDaemonData(data)
353
354   def CheckListening(self):
355     """Checks whether the daemon is listening.
356
357     """
358     raise NotImplementedError()
359
360   def _GetConnectedCheckEpoch(self):
361     """Returns timeout to calculate connect timeout.
362
363     """
364     raise NotImplementedError()
365
366   def CheckConnected(self):
367     """Checks whether the daemon is connected.
368
369     @rtype: bool
370     @return: Whether the daemon is connected
371
372     """
373     assert self._daemon, "Daemon status missing"
374
375     if self._ts_connected is not None:
376       return True
377
378     if self._daemon.connected:
379       self._ts_connected = time.time()
380
381       # TODO: Log remote peer
382       logging.debug("%s '%s' on %s is now connected",
383                     self.MODE_TEXT, self._daemon_name, self.node_name)
384
385       self._cbs.ReportConnected(self, self._private)
386
387       return True
388
389     if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
390       raise _ImportExportError("Not connected after %s seconds" %
391                                self._timeouts.connect)
392
393     return False
394
395   def _CheckProgress(self):
396     """Checks whether a progress update should be reported.
397
398     """
399     if ((self._ts_last_progress is None or
400          _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
401         self._daemon and
402         self._daemon.progress_mbytes is not None and
403         self._daemon.progress_throughput is not None):
404       self._cbs.ReportProgress(self, self._private)
405       self._ts_last_progress = time.time()
406
407   def CheckFinished(self):
408     """Checks whether the daemon exited.
409
410     @rtype: bool
411     @return: Whether the transfer is finished
412
413     """
414     assert self._daemon, "Daemon status missing"
415
416     if self._ts_finished:
417       return True
418
419     if self._daemon.exit_status is None:
420       # TODO: Adjust delay for ETA expiring soon
421       self._CheckProgress()
422       return False
423
424     self._ts_finished = time.time()
425
426     self._ReportFinished(self._daemon.exit_status == 0,
427                          self._daemon.error_message)
428
429     return True
430
431   def _ReportFinished(self, success, message):
432     """Transfer is finished or daemon exited.
433
434     @type success: bool
435     @param success: Whether the transfer was successful
436     @type message: string
437     @param message: Error message
438
439     """
440     assert self.success is None
441
442     self.success = success
443     self.final_message = message
444
445     if success:
446       logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
447                    self._daemon_name, self.node_name)
448     elif self._daemon_name:
449       self._lu.LogWarning("%s '%s' on %s failed: %s",
450                           self.MODE_TEXT, self._daemon_name, self.node_name,
451                           message)
452     else:
453       self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
454                           self.node_name, message)
455
456     self._cbs.ReportFinished(self, self._private)
457
458   def _Finalize(self):
459     """Makes the RPC call to finalize this import/export.
460
461     """
462     return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
463
464   def Finalize(self, error=None):
465     """Finalizes this import/export.
466
467     """
468     if self._daemon_name:
469       logging.info("Finalizing %s '%s' on %s",
470                    self.MODE_TEXT, self._daemon_name, self.node_name)
471
472       result = self._Finalize()
473       if result.fail_msg:
474         self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
475                             self.MODE_TEXT, self._daemon_name,
476                             self.node_name, result.fail_msg)
477         return False
478
479       # Daemon is no longer running
480       self._daemon_name = None
481       self._ts_cleanup = time.time()
482
483     if error:
484       self._ReportFinished(False, error)
485
486     return True
487
488
489 class DiskImport(_DiskImportExportBase):
490   MODE_TEXT = "import"
491
492   def __init__(self, lu, node_name, opts, instance, component,
493                dest, dest_args, timeouts, cbs, private=None):
494     """Initializes this class.
495
496     @param lu: Logical unit instance
497     @type node_name: string
498     @param node_name: Node name for import
499     @type opts: L{objects.ImportExportOptions}
500     @param opts: Import/export daemon options
501     @type instance: L{objects.Instance}
502     @param instance: Instance object
503     @type component: string
504     @param component: which part of the instance is being imported
505     @param dest: I/O destination
506     @param dest_args: I/O arguments
507     @type timeouts: L{ImportExportTimeouts}
508     @param timeouts: Timeouts for this import
509     @type cbs: L{ImportExportCbBase}
510     @param cbs: Callbacks
511     @param private: Private data for callback functions
512
513     """
514     _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
515                                    component, timeouts, cbs, private)
516     self._dest = dest
517     self._dest_args = dest_args
518
519     # Timestamps
520     self._ts_listening = None
521
522   @property
523   def listen_port(self):
524     """Returns the port the daemon is listening on.
525
526     """
527     if self._daemon:
528       return self._daemon.listen_port
529
530     return None
531
532   def _StartDaemon(self):
533     """Starts the import daemon.
534
535     """
536     return self._lu.rpc.call_import_start(self.node_name, self._opts,
537                                           self._instance, self._component,
538                                           self._dest, self._dest_args)
539
540   def CheckListening(self):
541     """Checks whether the daemon is listening.
542
543     @rtype: bool
544     @return: Whether the daemon is listening
545
546     """
547     assert self._daemon, "Daemon status missing"
548
549     if self._ts_listening is not None:
550       return True
551
552     port = self._daemon.listen_port
553     if port is not None:
554       self._ts_listening = time.time()
555
556       logging.debug("Import '%s' on %s is now listening on port %s",
557                     self._daemon_name, self.node_name, port)
558
559       self._cbs.ReportListening(self, self._private, self._component)
560
561       return True
562
563     if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
564       raise _ImportExportError("Not listening after %s seconds" %
565                                self._timeouts.listen)
566
567     return False
568
569   def _GetConnectedCheckEpoch(self):
570     """Returns the time since we started listening.
571
572     """
573     assert self._ts_listening is not None, \
574            ("Checking whether an import is connected is only useful"
575             " once it's been listening")
576
577     return self._ts_listening
578
579
580 class DiskExport(_DiskImportExportBase):
581   MODE_TEXT = "export"
582
583   def __init__(self, lu, node_name, opts, dest_host, dest_port,
584                instance, component, source, source_args,
585                timeouts, cbs, private=None):
586     """Initializes this class.
587
588     @param lu: Logical unit instance
589     @type node_name: string
590     @param node_name: Node name for import
591     @type opts: L{objects.ImportExportOptions}
592     @param opts: Import/export daemon options
593     @type dest_host: string
594     @param dest_host: Destination host name or IP address
595     @type dest_port: number
596     @param dest_port: Destination port number
597     @type instance: L{objects.Instance}
598     @param instance: Instance object
599     @type component: string
600     @param component: which part of the instance is being imported
601     @param source: I/O source
602     @param source_args: I/O source
603     @type timeouts: L{ImportExportTimeouts}
604     @param timeouts: Timeouts for this import
605     @type cbs: L{ImportExportCbBase}
606     @param cbs: Callbacks
607     @param private: Private data for callback functions
608
609     """
610     _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
611                                    component, timeouts, cbs, private)
612     self._dest_host = dest_host
613     self._dest_port = dest_port
614     self._source = source
615     self._source_args = source_args
616
617   def _StartDaemon(self):
618     """Starts the export daemon.
619
620     """
621     return self._lu.rpc.call_export_start(self.node_name, self._opts,
622                                           self._dest_host, self._dest_port,
623                                           self._instance, self._component,
624                                           self._source, self._source_args)
625
626   def CheckListening(self):
627     """Checks whether the daemon is listening.
628
629     """
630     # Only an import can be listening
631     return True
632
633   def _GetConnectedCheckEpoch(self):
634     """Returns the time since the daemon started.
635
636     """
637     assert self._ts_begin is not None
638
639     return self._ts_begin
640
641
642 def FormatProgress(progress):
643   """Formats progress information for user consumption
644
645   """
646   (mbytes, throughput, percent, eta) = progress
647
648   parts = [
649     utils.FormatUnit(mbytes, "h"),
650
651     # Not using FormatUnit as it doesn't support kilobytes
652     "%0.1f MiB/s" % throughput,
653     ]
654
655   if percent is not None:
656     parts.append("%d%%" % percent)
657
658   if eta is not None:
659     parts.append("ETA %s" % utils.FormatSeconds(eta))
660
661   return utils.CommaJoin(parts)
662
663
664 class ImportExportLoop:
665   MIN_DELAY = 1.0
666   MAX_DELAY = 20.0
667
668   def __init__(self, lu):
669     """Initializes this class.
670
671     """
672     self._lu = lu
673     self._queue = []
674     self._pending_add = []
675
676   def Add(self, diskie):
677     """Adds an import/export object to the loop.
678
679     @type diskie: Subclass of L{_DiskImportExportBase}
680     @param diskie: Import/export object
681
682     """
683     assert diskie not in self._pending_add
684     assert diskie.loop is None
685
686     diskie.SetLoop(self)
687
688     # Adding new objects to a staging list is necessary, otherwise the main
689     # loop gets confused if callbacks modify the queue while the main loop is
690     # iterating over it.
691     self._pending_add.append(diskie)
692
693   @staticmethod
694   def _CollectDaemonStatus(lu, daemons):
695     """Collects the status for all import/export daemons.
696
697     """
698     daemon_status = {}
699
700     for node_name, names in daemons.iteritems():
701       result = lu.rpc.call_impexp_status(node_name, names)
702       if result.fail_msg:
703         lu.LogWarning("Failed to get daemon status on %s: %s",
704                       node_name, result.fail_msg)
705         continue
706
707       assert len(names) == len(result.payload)
708
709       daemon_status[node_name] = dict(zip(names, result.payload))
710
711     return daemon_status
712
713   @staticmethod
714   def _GetActiveDaemonNames(queue):
715     """Gets the names of all active daemons.
716
717     """
718     result = {}
719     for diskie in queue:
720       if not diskie.active:
721         continue
722
723       try:
724         # Start daemon if necessary
725         daemon_name = diskie.CheckDaemon()
726       except _ImportExportError, err:
727         logging.exception("%s failed", diskie.MODE_TEXT)
728         diskie.Finalize(error=str(err))
729         continue
730
731       result.setdefault(diskie.node_name, []).append(daemon_name)
732
733     assert len(queue) >= len(result)
734     assert len(queue) >= sum([len(names) for names in result.itervalues()])
735
736     logging.debug("daemons=%r", result)
737
738     return result
739
740   def _AddPendingToQueue(self):
741     """Adds all pending import/export objects to the internal queue.
742
743     """
744     assert compat.all(diskie not in self._queue and diskie.loop == self
745                       for diskie in self._pending_add)
746
747     self._queue.extend(self._pending_add)
748
749     del self._pending_add[:]
750
751   def Run(self):
752     """Utility main loop.
753
754     """
755     while True:
756       self._AddPendingToQueue()
757
758       # Collect all active daemon names
759       daemons = self._GetActiveDaemonNames(self._queue)
760       if not daemons:
761         break
762
763       # Collection daemon status data
764       data = self._CollectDaemonStatus(self._lu, daemons)
765
766       # Use data
767       delay = self.MAX_DELAY
768       for diskie in self._queue:
769         if not diskie.active:
770           continue
771
772         try:
773           try:
774             all_daemon_data = data[diskie.node_name]
775           except KeyError:
776             result = diskie.SetDaemonData(False, None)
777           else:
778             result = \
779               diskie.SetDaemonData(True,
780                                    all_daemon_data[diskie.GetDaemonName()])
781
782           if not result:
783             # Daemon not yet ready, retry soon
784             delay = min(3.0, delay)
785             continue
786
787           if diskie.CheckFinished():
788             # Transfer finished
789             diskie.Finalize()
790             continue
791
792           # Normal case: check again in 5 seconds
793           delay = min(5.0, delay)
794
795           if not diskie.CheckListening():
796             # Not yet listening, retry soon
797             delay = min(1.0, delay)
798             continue
799
800           if not diskie.CheckConnected():
801             # Not yet connected, retry soon
802             delay = min(1.0, delay)
803             continue
804
805         except _ImportExportError, err:
806           logging.exception("%s failed", diskie.MODE_TEXT)
807           diskie.Finalize(error=str(err))
808
809       if not compat.any(diskie.active for diskie in self._queue):
810         break
811
812       # Wait a bit
813       delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
814       logging.debug("Waiting for %ss", delay)
815       time.sleep(delay)
816
817   def FinalizeAll(self):
818     """Finalizes all pending transfers.
819
820     """
821     success = True
822
823     for diskie in self._queue:
824       success = diskie.Finalize() and success
825
826     return success
827
828
829 class _TransferInstCbBase(ImportExportCbBase):
830   def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
831                dest_node, dest_ip):
832     """Initializes this class.
833
834     """
835     ImportExportCbBase.__init__(self)
836
837     self.lu = lu
838     self.feedback_fn = feedback_fn
839     self.instance = instance
840     self.timeouts = timeouts
841     self.src_node = src_node
842     self.src_cbs = src_cbs
843     self.dest_node = dest_node
844     self.dest_ip = dest_ip
845
846
847 class _TransferInstSourceCb(_TransferInstCbBase):
848   def ReportConnected(self, ie, dtp):
849     """Called when a connection has been established.
850
851     """
852     assert self.src_cbs is None
853     assert dtp.src_export == ie
854     assert dtp.dest_import
855
856     self.feedback_fn("%s is sending data on %s" %
857                      (dtp.data.name, ie.node_name))
858
859   def ReportProgress(self, ie, dtp):
860     """Called when new progress information should be reported.
861
862     """
863     progress = ie.progress
864     if not progress:
865       return
866
867     self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
868
869   def ReportFinished(self, ie, dtp):
870     """Called when a transfer has finished.
871
872     """
873     assert self.src_cbs is None
874     assert dtp.src_export == ie
875     assert dtp.dest_import
876
877     if ie.success:
878       self.feedback_fn("%s finished sending data" % dtp.data.name)
879     else:
880       self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
881                        (dtp.data.name, ie.final_message, ie.recent_output))
882
883     dtp.RecordResult(ie.success)
884
885     cb = dtp.data.finished_fn
886     if cb:
887       cb()
888
889     # TODO: Check whether sending SIGTERM right away is okay, maybe we should
890     # give the daemon a moment to sort things out
891     if dtp.dest_import and not ie.success:
892       dtp.dest_import.Abort()
893
894
895 class _TransferInstDestCb(_TransferInstCbBase):
896   def ReportListening(self, ie, dtp, component):
897     """Called when daemon started listening.
898
899     """
900     assert self.src_cbs
901     assert dtp.src_export is None
902     assert dtp.dest_import
903     assert dtp.export_opts
904
905     self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
906
907     # Start export on source node
908     de = DiskExport(self.lu, self.src_node, dtp.export_opts,
909                     self.dest_ip, ie.listen_port, self.instance,
910                     component, dtp.data.src_io, dtp.data.src_ioargs,
911                     self.timeouts, self.src_cbs, private=dtp)
912     ie.loop.Add(de)
913
914     dtp.src_export = de
915
916   def ReportConnected(self, ie, dtp):
917     """Called when a connection has been established.
918
919     """
920     self.feedback_fn("%s is receiving data on %s" %
921                      (dtp.data.name, self.dest_node))
922
923   def ReportFinished(self, ie, dtp):
924     """Called when a transfer has finished.
925
926     """
927     if ie.success:
928       self.feedback_fn("%s finished receiving data" % dtp.data.name)
929     else:
930       self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
931                        (dtp.data.name, ie.final_message, ie.recent_output))
932
933     dtp.RecordResult(ie.success)
934
935     # TODO: Check whether sending SIGTERM right away is okay, maybe we should
936     # give the daemon a moment to sort things out
937     if dtp.src_export and not ie.success:
938       dtp.src_export.Abort()
939
940
941 class DiskTransfer(object):
942   def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
943                finished_fn):
944     """Initializes this class.
945
946     @type name: string
947     @param name: User-visible name for this transfer (e.g. "disk/0")
948     @param src_io: Source I/O type
949     @param src_ioargs: Source I/O arguments
950     @param dest_io: Destination I/O type
951     @param dest_ioargs: Destination I/O arguments
952     @type finished_fn: callable
953     @param finished_fn: Function called once transfer has finished
954
955     """
956     self.name = name
957
958     self.src_io = src_io
959     self.src_ioargs = src_ioargs
960
961     self.dest_io = dest_io
962     self.dest_ioargs = dest_ioargs
963
964     self.finished_fn = finished_fn
965
966
967 class _DiskTransferPrivate(object):
968   def __init__(self, data, success, export_opts):
969     """Initializes this class.
970
971     @type data: L{DiskTransfer}
972     @type success: bool
973
974     """
975     self.data = data
976     self.success = success
977     self.export_opts = export_opts
978
979     self.src_export = None
980     self.dest_import = None
981
982   def RecordResult(self, success):
983     """Updates the status.
984
985     One failed part will cause the whole transfer to fail.
986
987     """
988     self.success = self.success and success
989
990
991 def _GetInstDiskMagic(base, instance_name, index):
992   """Computes the magic value for a disk export or import.
993
994   @type base: string
995   @param base: Random seed value (can be the same for all disks of a transfer)
996   @type instance_name: string
997   @param instance_name: Name of instance
998   @type index: number
999   @param index: Disk index
1000
1001   """
1002   h = compat.sha1_hash()
1003   h.update(str(constants.RIE_VERSION))
1004   h.update(base)
1005   h.update(instance_name)
1006   h.update(str(index))
1007   return h.hexdigest()
1008
1009
1010 def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
1011                          instance, all_transfers):
1012   """Transfers an instance's data from one node to another.
1013
1014   @param lu: Logical unit instance
1015   @param feedback_fn: Feedback function
1016   @type src_node: string
1017   @param src_node: Source node name
1018   @type dest_node: string
1019   @param dest_node: Destination node name
1020   @type dest_ip: string
1021   @param dest_ip: IP address of destination node
1022   @type instance: L{objects.Instance}
1023   @param instance: Instance object
1024   @type all_transfers: list of L{DiskTransfer} instances
1025   @param all_transfers: List of all disk transfers to be made
1026   @rtype: list
1027   @return: List with a boolean (True=successful, False=failed) for success for
1028            each transfer
1029
1030   """
1031   # Disable compression for all moves as these are all within the same cluster
1032   compress = constants.IEC_NONE
1033
1034   logging.debug("Source node %s, destination node %s, compression '%s'",
1035                 src_node, dest_node, compress)
1036
1037   timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1038   src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1039                                   src_node, None, dest_node, dest_ip)
1040   dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1041                                  src_node, src_cbs, dest_node, dest_ip)
1042
1043   all_dtp = []
1044
1045   base_magic = utils.GenerateSecret(6)
1046
1047   ieloop = ImportExportLoop(lu)
1048   try:
1049     for idx, transfer in enumerate(all_transfers):
1050       if transfer:
1051         feedback_fn("Exporting %s from %s to %s" %
1052                     (transfer.name, src_node, dest_node))
1053
1054         magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1055         opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1056                                            compress=compress, magic=magic)
1057
1058         dtp = _DiskTransferPrivate(transfer, True, opts)
1059
1060         di = DiskImport(lu, dest_node, opts, instance, "disk%d" % idx,
1061                         transfer.dest_io, transfer.dest_ioargs,
1062                         timeouts, dest_cbs, private=dtp)
1063         ieloop.Add(di)
1064
1065         dtp.dest_import = di
1066       else:
1067         dtp = _DiskTransferPrivate(None, False, None)
1068
1069       all_dtp.append(dtp)
1070
1071     ieloop.Run()
1072   finally:
1073     ieloop.FinalizeAll()
1074
1075   assert len(all_dtp) == len(all_transfers)
1076   assert compat.all((dtp.src_export is None or
1077                       dtp.src_export.success is not None) and
1078                      (dtp.dest_import is None or
1079                       dtp.dest_import.success is not None)
1080                      for dtp in all_dtp), \
1081          "Not all imports/exports are finalized"
1082
1083   return [bool(dtp.success) for dtp in all_dtp]
1084
1085
1086 class _RemoteExportCb(ImportExportCbBase):
1087   def __init__(self, feedback_fn, disk_count):
1088     """Initializes this class.
1089
1090     """
1091     ImportExportCbBase.__init__(self)
1092     self._feedback_fn = feedback_fn
1093     self._dresults = [None] * disk_count
1094
1095   @property
1096   def disk_results(self):
1097     """Returns per-disk results.
1098
1099     """
1100     return self._dresults
1101
1102   def ReportConnected(self, ie, private):
1103     """Called when a connection has been established.
1104
1105     """
1106     (idx, _) = private
1107
1108     self._feedback_fn("Disk %s is now sending data" % idx)
1109
1110   def ReportProgress(self, ie, private):
1111     """Called when new progress information should be reported.
1112
1113     """
1114     (idx, _) = private
1115
1116     progress = ie.progress
1117     if not progress:
1118       return
1119
1120     self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1121
1122   def ReportFinished(self, ie, private):
1123     """Called when a transfer has finished.
1124
1125     """
1126     (idx, finished_fn) = private
1127
1128     if ie.success:
1129       self._feedback_fn("Disk %s finished sending data" % idx)
1130     else:
1131       self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1132                         (idx, ie.final_message, ie.recent_output))
1133
1134     self._dresults[idx] = bool(ie.success)
1135
1136     if finished_fn:
1137       finished_fn()
1138
1139
1140 class ExportInstanceHelper:
1141   def __init__(self, lu, feedback_fn, instance):
1142     """Initializes this class.
1143
1144     @param lu: Logical unit instance
1145     @param feedback_fn: Feedback function
1146     @type instance: L{objects.Instance}
1147     @param instance: Instance object
1148
1149     """
1150     self._lu = lu
1151     self._feedback_fn = feedback_fn
1152     self._instance = instance
1153
1154     self._snap_disks = []
1155     self._removed_snaps = [False] * len(instance.disks)
1156
1157   def CreateSnapshots(self):
1158     """Creates an LVM snapshot for every disk of the instance.
1159
1160     """
1161     assert not self._snap_disks
1162
1163     instance = self._instance
1164     src_node = instance.primary_node
1165
1166     for idx, disk in enumerate(instance.disks):
1167       self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1168                         (idx, src_node))
1169
1170       # result.payload will be a snapshot of an lvm leaf of the one we
1171       # passed
1172       result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1173       new_dev = False
1174       msg = result.fail_msg
1175       if msg:
1176         self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1177                             idx, src_node, msg)
1178       elif (not isinstance(result.payload, (tuple, list)) or
1179             len(result.payload) != 2):
1180         self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1181                             " result '%s'", idx, src_node, result.payload)
1182       else:
1183         disk_id = tuple(result.payload)
1184         new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1185                                logical_id=disk_id, physical_id=disk_id,
1186                                iv_name=disk.iv_name)
1187
1188       self._snap_disks.append(new_dev)
1189
1190     assert len(self._snap_disks) == len(instance.disks)
1191     assert len(self._removed_snaps) == len(instance.disks)
1192
1193   def _RemoveSnapshot(self, disk_index):
1194     """Removes an LVM snapshot.
1195
1196     @type disk_index: number
1197     @param disk_index: Index of the snapshot to be removed
1198
1199     """
1200     disk = self._snap_disks[disk_index]
1201     if disk and not self._removed_snaps[disk_index]:
1202       src_node = self._instance.primary_node
1203
1204       self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1205                         (disk_index, src_node))
1206
1207       result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1208       if result.fail_msg:
1209         self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1210                             " %s: %s", disk_index, src_node, result.fail_msg)
1211       else:
1212         self._removed_snaps[disk_index] = True
1213
1214   def LocalExport(self, dest_node):
1215     """Intra-cluster instance export.
1216
1217     @type dest_node: L{objects.Node}
1218     @param dest_node: Destination node
1219
1220     """
1221     instance = self._instance
1222     src_node = instance.primary_node
1223
1224     assert len(self._snap_disks) == len(instance.disks)
1225
1226     transfers = []
1227
1228     for idx, dev in enumerate(self._snap_disks):
1229       if not dev:
1230         transfers.append(None)
1231         continue
1232
1233       path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1234                             dev.physical_id[1])
1235
1236       finished_fn = compat.partial(self._TransferFinished, idx)
1237
1238       # FIXME: pass debug option from opcode to backend
1239       dt = DiskTransfer("snapshot/%s" % idx,
1240                         constants.IEIO_SCRIPT, (dev, idx),
1241                         constants.IEIO_FILE, (path, ),
1242                         finished_fn)
1243       transfers.append(dt)
1244
1245     # Actually export data
1246     dresults = TransferInstanceData(self._lu, self._feedback_fn,
1247                                     src_node, dest_node.name,
1248                                     dest_node.secondary_ip,
1249                                     instance, transfers)
1250
1251     assert len(dresults) == len(instance.disks)
1252
1253     self._feedback_fn("Finalizing export on %s" % dest_node.name)
1254     result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1255                                                self._snap_disks)
1256     msg = result.fail_msg
1257     fin_resu = not msg
1258     if msg:
1259       self._lu.LogWarning("Could not finalize export for instance %s"
1260                           " on node %s: %s", instance.name, dest_node.name, msg)
1261
1262     return (fin_resu, dresults)
1263
1264   def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1265     """Inter-cluster instance export.
1266
1267     @type disk_info: list
1268     @param disk_info: Per-disk destination information
1269     @type key_name: string
1270     @param key_name: Name of X509 key to use
1271     @type dest_ca_pem: string
1272     @param dest_ca_pem: Destination X509 CA in PEM format
1273     @type timeouts: L{ImportExportTimeouts}
1274     @param timeouts: Timeouts for this import
1275
1276     """
1277     instance = self._instance
1278
1279     assert len(disk_info) == len(instance.disks)
1280
1281     cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1282
1283     ieloop = ImportExportLoop(self._lu)
1284     try:
1285       for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1286                                                            disk_info)):
1287         # Decide whether to use IPv6
1288         ipv6 = netutils.IP6Address.IsValid(host)
1289
1290         opts = objects.ImportExportOptions(key_name=key_name,
1291                                            ca_pem=dest_ca_pem,
1292                                            magic=magic, ipv6=ipv6)
1293
1294         self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1295         finished_fn = compat.partial(self._TransferFinished, idx)
1296         ieloop.Add(DiskExport(self._lu, instance.primary_node,
1297                               opts, host, port, instance, "disk%d" % idx,
1298                               constants.IEIO_SCRIPT, (dev, idx),
1299                               timeouts, cbs, private=(idx, finished_fn)))
1300
1301       ieloop.Run()
1302     finally:
1303       ieloop.FinalizeAll()
1304
1305     return (True, cbs.disk_results)
1306
1307   def _TransferFinished(self, idx):
1308     """Called once a transfer has finished.
1309
1310     @type idx: number
1311     @param idx: Disk index
1312
1313     """
1314     logging.debug("Transfer %s finished", idx)
1315     self._RemoveSnapshot(idx)
1316
1317   def Cleanup(self):
1318     """Remove all snapshots.
1319
1320     """
1321     assert len(self._removed_snaps) == len(self._instance.disks)
1322     for idx in range(len(self._instance.disks)):
1323       self._RemoveSnapshot(idx)
1324
1325
1326 class _RemoteImportCb(ImportExportCbBase):
1327   def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1328                external_address):
1329     """Initializes this class.
1330
1331     @type cds: string
1332     @param cds: Cluster domain secret
1333     @type x509_cert_pem: string
1334     @param x509_cert_pem: CA used for signing import key
1335     @type disk_count: number
1336     @param disk_count: Number of disks
1337     @type external_address: string
1338     @param external_address: External address of destination node
1339
1340     """
1341     ImportExportCbBase.__init__(self)
1342     self._feedback_fn = feedback_fn
1343     self._cds = cds
1344     self._x509_cert_pem = x509_cert_pem
1345     self._disk_count = disk_count
1346     self._external_address = external_address
1347
1348     self._dresults = [None] * disk_count
1349     self._daemon_port = [None] * disk_count
1350
1351     self._salt = utils.GenerateSecret(8)
1352
1353   @property
1354   def disk_results(self):
1355     """Returns per-disk results.
1356
1357     """
1358     return self._dresults
1359
1360   def _CheckAllListening(self):
1361     """Checks whether all daemons are listening.
1362
1363     If all daemons are listening, the information is sent to the client.
1364
1365     """
1366     if not compat.all(dp is not None for dp in self._daemon_port):
1367       return
1368
1369     host = self._external_address
1370
1371     disks = []
1372     for idx, (port, magic) in enumerate(self._daemon_port):
1373       disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1374                                                idx, host, port, magic))
1375
1376     assert len(disks) == self._disk_count
1377
1378     self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1379       "disks": disks,
1380       "x509_ca": self._x509_cert_pem,
1381       })
1382
1383   def ReportListening(self, ie, private, _):
1384     """Called when daemon started listening.
1385
1386     """
1387     (idx, ) = private
1388
1389     self._feedback_fn("Disk %s is now listening" % idx)
1390
1391     assert self._daemon_port[idx] is None
1392
1393     self._daemon_port[idx] = (ie.listen_port, ie.magic)
1394
1395     self._CheckAllListening()
1396
1397   def ReportConnected(self, ie, private):
1398     """Called when a connection has been established.
1399
1400     """
1401     (idx, ) = private
1402
1403     self._feedback_fn("Disk %s is now receiving data" % idx)
1404
1405   def ReportFinished(self, ie, private):
1406     """Called when a transfer has finished.
1407
1408     """
1409     (idx, ) = private
1410
1411     # Daemon is certainly no longer listening
1412     self._daemon_port[idx] = None
1413
1414     if ie.success:
1415       self._feedback_fn("Disk %s finished receiving data" % idx)
1416     else:
1417       self._feedback_fn(("Disk %s failed to receive data: %s"
1418                          " (recent output: %s)") %
1419                         (idx, ie.final_message, ie.recent_output))
1420
1421     self._dresults[idx] = bool(ie.success)
1422
1423
1424 def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1425                  cds, timeouts):
1426   """Imports an instance from another cluster.
1427
1428   @param lu: Logical unit instance
1429   @param feedback_fn: Feedback function
1430   @type instance: L{objects.Instance}
1431   @param instance: Instance object
1432   @type pnode: L{objects.Node}
1433   @param pnode: Primary node of instance as an object
1434   @type source_x509_ca: OpenSSL.crypto.X509
1435   @param source_x509_ca: Import source's X509 CA
1436   @type cds: string
1437   @param cds: Cluster domain secret
1438   @type timeouts: L{ImportExportTimeouts}
1439   @param timeouts: Timeouts for this import
1440
1441   """
1442   source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1443                                                   source_x509_ca)
1444
1445   magic_base = utils.GenerateSecret(6)
1446
1447   # Decide whether to use IPv6
1448   ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1449
1450   # Create crypto key
1451   result = lu.rpc.call_x509_cert_create(instance.primary_node,
1452                                         constants.RIE_CERT_VALIDITY)
1453   result.Raise("Can't create X509 key and certificate on %s" % result.node)
1454
1455   (x509_key_name, x509_cert_pem) = result.payload
1456   try:
1457     # Load certificate
1458     x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1459                                                 x509_cert_pem)
1460
1461     # Sign certificate
1462     signed_x509_cert_pem = \
1463       utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1464
1465     cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1466                           len(instance.disks), pnode.primary_ip)
1467
1468     ieloop = ImportExportLoop(lu)
1469     try:
1470       for idx, dev in enumerate(instance.disks):
1471         magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1472
1473         # Import daemon options
1474         opts = objects.ImportExportOptions(key_name=x509_key_name,
1475                                            ca_pem=source_ca_pem,
1476                                            magic=magic, ipv6=ipv6)
1477
1478         ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1479                               "disk%d" % idx,
1480                               constants.IEIO_SCRIPT, (dev, idx),
1481                               timeouts, cbs, private=(idx, )))
1482
1483       ieloop.Run()
1484     finally:
1485       ieloop.FinalizeAll()
1486   finally:
1487     # Remove crypto key and certificate
1488     result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1489     result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1490
1491   return cbs.disk_results
1492
1493
1494 def _GetImportExportHandshakeMessage(version):
1495   """Returns the handshake message for a RIE protocol version.
1496
1497   @type version: number
1498
1499   """
1500   return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1501
1502
1503 def ComputeRemoteExportHandshake(cds):
1504   """Computes the remote import/export handshake.
1505
1506   @type cds: string
1507   @param cds: Cluster domain secret
1508
1509   """
1510   salt = utils.GenerateSecret(8)
1511   msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1512   return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1513
1514
1515 def CheckRemoteExportHandshake(cds, handshake):
1516   """Checks the handshake of a remote import/export.
1517
1518   @type cds: string
1519   @param cds: Cluster domain secret
1520   @type handshake: sequence
1521   @param handshake: Handshake sent by remote peer
1522
1523   """
1524   try:
1525     (version, hmac_digest, hmac_salt) = handshake
1526   except (TypeError, ValueError), err:
1527     return "Invalid data: %s" % err
1528
1529   if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1530                               hmac_digest, salt=hmac_salt):
1531     return "Hash didn't match, clusters don't share the same domain secret"
1532
1533   if version != constants.RIE_VERSION:
1534     return ("Clusters don't have the same remote import/export protocol"
1535             " (local=%s, remote=%s)" %
1536             (constants.RIE_VERSION, version))
1537
1538   return None
1539
1540
1541 def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1542   """Returns the hashed text for import/export disk information.
1543
1544   @type disk_index: number
1545   @param disk_index: Index of disk (included in hash)
1546   @type host: string
1547   @param host: Hostname
1548   @type port: number
1549   @param port: Daemon port
1550   @type magic: string
1551   @param magic: Magic value
1552
1553   """
1554   return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1555
1556
1557 def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1558   """Verifies received disk information for an export.
1559
1560   @type cds: string
1561   @param cds: Cluster domain secret
1562   @type disk_index: number
1563   @param disk_index: Index of disk (included in hash)
1564   @type disk_info: sequence
1565   @param disk_info: Disk information sent by remote peer
1566
1567   """
1568   try:
1569     (host, port, magic, hmac_digest, hmac_salt) = disk_info
1570   except (TypeError, ValueError), err:
1571     raise errors.GenericError("Invalid data: %s" % err)
1572
1573   if not (host and port and magic):
1574     raise errors.GenericError("Missing destination host, port or magic")
1575
1576   msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1577
1578   if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1579     raise errors.GenericError("HMAC is wrong")
1580
1581   if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1582     destination = host
1583   else:
1584     destination = netutils.Hostname.GetNormalizedName(host)
1585
1586   return (destination,
1587           utils.ValidateServiceName(port),
1588           magic)
1589
1590
1591 def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1592   """Computes the signed disk information for a remote import.
1593
1594   @type cds: string
1595   @param cds: Cluster domain secret
1596   @type salt: string
1597   @param salt: HMAC salt
1598   @type disk_index: number
1599   @param disk_index: Index of disk (included in hash)
1600   @type host: string
1601   @param host: Hostname
1602   @type port: number
1603   @param port: Daemon port
1604   @type magic: string
1605   @param magic: Magic value
1606
1607   """
1608   msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1609   hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1610   return (host, port, magic, hmac_digest, salt)