Import-export: fix logging of daemon output
[ganeti-local] / lib / masterd / instance.py
1 #
2 #
3
4 # Copyright (C) 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Instance-related functions and classes for masterd.
23
24 """
25
26 import logging
27 import time
28 import OpenSSL
29
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import compat
33 from ganeti import utils
34 from ganeti import objects
35 from ganeti import netutils
36
37
38 class _ImportExportError(Exception):
39   """Local exception to report import/export errors.
40
41   """
42
43
44 class ImportExportTimeouts(object):
45   #: Time until daemon starts writing status file
46   DEFAULT_READY_TIMEOUT = 10
47
48   #: Length of time until errors cause hard failure
49   DEFAULT_ERROR_TIMEOUT = 10
50
51   #: Time after which daemon must be listening
52   DEFAULT_LISTEN_TIMEOUT = 10
53
54   #: Progress update interval
55   DEFAULT_PROGRESS_INTERVAL = 60
56
57   __slots__ = [
58     "error",
59     "ready",
60     "listen",
61     "connect",
62     "progress",
63     ]
64
65   def __init__(self, connect,
66                listen=DEFAULT_LISTEN_TIMEOUT,
67                error=DEFAULT_ERROR_TIMEOUT,
68                ready=DEFAULT_READY_TIMEOUT,
69                progress=DEFAULT_PROGRESS_INTERVAL):
70     """Initializes this class.
71
72     @type connect: number
73     @param connect: Timeout for establishing connection
74     @type listen: number
75     @param listen: Timeout for starting to listen for connections
76     @type error: number
77     @param error: Length of time until errors cause hard failure
78     @type ready: number
79     @param ready: Timeout for daemon to become ready
80     @type progress: number
81     @param progress: Progress update interval
82
83     """
84     self.error = error
85     self.ready = ready
86     self.listen = listen
87     self.connect = connect
88     self.progress = progress
89
90
91 class ImportExportCbBase(object):
92   """Callbacks for disk import/export.
93
94   """
95   def ReportListening(self, ie, private):
96     """Called when daemon started listening.
97
98     @type ie: Subclass of L{_DiskImportExportBase}
99     @param ie: Import/export object
100     @param private: Private data passed to import/export object
101
102     """
103
104   def ReportConnected(self, ie, private):
105     """Called when a connection has been established.
106
107     @type ie: Subclass of L{_DiskImportExportBase}
108     @param ie: Import/export object
109     @param private: Private data passed to import/export object
110
111     """
112
113   def ReportProgress(self, ie, private):
114     """Called when new progress information should be reported.
115
116     @type ie: Subclass of L{_DiskImportExportBase}
117     @param ie: Import/export object
118     @param private: Private data passed to import/export object
119
120     """
121
122   def ReportFinished(self, ie, private):
123     """Called when a transfer has finished.
124
125     @type ie: Subclass of L{_DiskImportExportBase}
126     @param ie: Import/export object
127     @param private: Private data passed to import/export object
128
129     """
130
131
132 def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
133   """Checks whether a timeout has expired.
134
135   """
136   return _time_fn() > (epoch + timeout)
137
138
139 class _DiskImportExportBase(object):
140   MODE_TEXT = None
141
142   def __init__(self, lu, node_name, opts,
143                instance, timeouts, cbs, private=None):
144     """Initializes this class.
145
146     @param lu: Logical unit instance
147     @type node_name: string
148     @param node_name: Node name for import
149     @type opts: L{objects.ImportExportOptions}
150     @param opts: Import/export daemon options
151     @type instance: L{objects.Instance}
152     @param instance: Instance object
153     @type timeouts: L{ImportExportTimeouts}
154     @param timeouts: Timeouts for this import
155     @type cbs: L{ImportExportCbBase}
156     @param cbs: Callbacks
157     @param private: Private data for callback functions
158
159     """
160     assert self.MODE_TEXT
161
162     self._lu = lu
163     self.node_name = node_name
164     self._opts = opts.Copy()
165     self._instance = instance
166     self._timeouts = timeouts
167     self._cbs = cbs
168     self._private = private
169
170     # Set master daemon's timeout in options for import/export daemon
171     assert self._opts.connect_timeout is None
172     self._opts.connect_timeout = timeouts.connect
173
174     # Parent loop
175     self._loop = None
176
177     # Timestamps
178     self._ts_begin = None
179     self._ts_connected = None
180     self._ts_finished = None
181     self._ts_cleanup = None
182     self._ts_last_progress = None
183     self._ts_last_error = None
184
185     # Transfer status
186     self.success = None
187     self.final_message = None
188
189     # Daemon status
190     self._daemon_name = None
191     self._daemon = None
192
193   @property
194   def recent_output(self):
195     """Returns the most recent output from the daemon.
196
197     """
198     if self._daemon:
199       return "\n".join(self._daemon.recent_output)
200
201     return None
202
203   @property
204   def progress(self):
205     """Returns transfer progress information.
206
207     """
208     if not self._daemon:
209       return None
210
211     return (self._daemon.progress_mbytes,
212             self._daemon.progress_throughput,
213             self._daemon.progress_percent,
214             self._daemon.progress_eta)
215
216   @property
217   def magic(self):
218     """Returns the magic value for this import/export.
219
220     """
221     return self._opts.magic
222
223   @property
224   def active(self):
225     """Determines whether this transport is still active.
226
227     """
228     return self.success is None
229
230   @property
231   def loop(self):
232     """Returns parent loop.
233
234     @rtype: L{ImportExportLoop}
235
236     """
237     return self._loop
238
239   def SetLoop(self, loop):
240     """Sets the parent loop.
241
242     @type loop: L{ImportExportLoop}
243
244     """
245     if self._loop:
246       raise errors.ProgrammerError("Loop can only be set once")
247
248     self._loop = loop
249
250   def _StartDaemon(self):
251     """Starts the import/export daemon.
252
253     """
254     raise NotImplementedError()
255
256   def CheckDaemon(self):
257     """Checks whether daemon has been started and if not, starts it.
258
259     @rtype: string
260     @return: Daemon name
261
262     """
263     assert self._ts_cleanup is None
264
265     if self._daemon_name is None:
266       assert self._ts_begin is None
267
268       result = self._StartDaemon()
269       if result.fail_msg:
270         raise _ImportExportError("Failed to start %s on %s: %s" %
271                                  (self.MODE_TEXT, self.node_name,
272                                   result.fail_msg))
273
274       daemon_name = result.payload
275
276       logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
277                    self.node_name)
278
279       self._ts_begin = time.time()
280       self._daemon_name = daemon_name
281
282     return self._daemon_name
283
284   def GetDaemonName(self):
285     """Returns the daemon name.
286
287     """
288     assert self._daemon_name, "Daemon has not been started"
289     assert self._ts_cleanup is None
290     return self._daemon_name
291
292   def Abort(self):
293     """Sends SIGTERM to import/export daemon (if still active).
294
295     """
296     if self._daemon_name:
297       self._lu.LogWarning("Aborting %s %r on %s",
298                           self.MODE_TEXT, self._daemon_name, self.node_name)
299       result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
300       if result.fail_msg:
301         self._lu.LogWarning("Failed to abort %s %r on %s: %s",
302                             self.MODE_TEXT, self._daemon_name,
303                             self.node_name, result.fail_msg)
304         return False
305
306     return True
307
308   def _SetDaemonData(self, data):
309     """Internal function for updating status daemon data.
310
311     @type data: L{objects.ImportExportStatus}
312     @param data: Daemon status data
313
314     """
315     assert self._ts_begin is not None
316
317     if not data:
318       if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
319         raise _ImportExportError("Didn't become ready after %s seconds" %
320                                  self._timeouts.ready)
321
322       return False
323
324     self._daemon = data
325
326     return True
327
328   def SetDaemonData(self, success, data):
329     """Updates daemon status data.
330
331     @type success: bool
332     @param success: Whether fetching data was successful or not
333     @type data: L{objects.ImportExportStatus}
334     @param data: Daemon status data
335
336     """
337     if not success:
338       if self._ts_last_error is None:
339         self._ts_last_error = time.time()
340
341       elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
342         raise _ImportExportError("Too many errors while updating data")
343
344       return False
345
346     self._ts_last_error = None
347
348     return self._SetDaemonData(data)
349
350   def CheckListening(self):
351     """Checks whether the daemon is listening.
352
353     """
354     raise NotImplementedError()
355
356   def _GetConnectedCheckEpoch(self):
357     """Returns timeout to calculate connect timeout.
358
359     """
360     raise NotImplementedError()
361
362   def CheckConnected(self):
363     """Checks whether the daemon is connected.
364
365     @rtype: bool
366     @return: Whether the daemon is connected
367
368     """
369     assert self._daemon, "Daemon status missing"
370
371     if self._ts_connected is not None:
372       return True
373
374     if self._daemon.connected:
375       self._ts_connected = time.time()
376
377       # TODO: Log remote peer
378       logging.debug("%s %r on %s is now connected",
379                     self.MODE_TEXT, self._daemon_name, self.node_name)
380
381       self._cbs.ReportConnected(self, self._private)
382
383       return True
384
385     if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
386       raise _ImportExportError("Not connected after %s seconds" %
387                                self._timeouts.connect)
388
389     return False
390
391   def _CheckProgress(self):
392     """Checks whether a progress update should be reported.
393
394     """
395     if ((self._ts_last_progress is None or
396          _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
397         self._daemon and
398         self._daemon.progress_mbytes is not None and
399         self._daemon.progress_throughput is not None):
400       self._cbs.ReportProgress(self, self._private)
401       self._ts_last_progress = time.time()
402
403   def CheckFinished(self):
404     """Checks whether the daemon exited.
405
406     @rtype: bool
407     @return: Whether the transfer is finished
408
409     """
410     assert self._daemon, "Daemon status missing"
411
412     if self._ts_finished:
413       return True
414
415     if self._daemon.exit_status is None:
416       # TODO: Adjust delay for ETA expiring soon
417       self._CheckProgress()
418       return False
419
420     self._ts_finished = time.time()
421
422     self._ReportFinished(self._daemon.exit_status == 0,
423                          self._daemon.error_message)
424
425     return True
426
427   def _ReportFinished(self, success, message):
428     """Transfer is finished or daemon exited.
429
430     @type success: bool
431     @param success: Whether the transfer was successful
432     @type message: string
433     @param message: Error message
434
435     """
436     assert self.success is None
437
438     self.success = success
439     self.final_message = message
440
441     if success:
442       logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
443                    self.node_name)
444     elif self._daemon_name:
445       self._lu.LogWarning("%s %r on %s failed: %s",
446                           self.MODE_TEXT, self._daemon_name, self.node_name,
447                           message)
448     else:
449       self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
450                           self.node_name, message)
451
452     self._cbs.ReportFinished(self, self._private)
453
454   def _Finalize(self):
455     """Makes the RPC call to finalize this import/export.
456
457     """
458     return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
459
460   def Finalize(self, error=None):
461     """Finalizes this import/export.
462
463     """
464     if self._daemon_name:
465       logging.info("Finalizing %s %r on %s",
466                    self.MODE_TEXT, self._daemon_name, self.node_name)
467
468       result = self._Finalize()
469       if result.fail_msg:
470         self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
471                             self.MODE_TEXT, self._daemon_name,
472                             self.node_name, result.fail_msg)
473         return False
474
475       # Daemon is no longer running
476       self._daemon_name = None
477       self._ts_cleanup = time.time()
478
479     if error:
480       self._ReportFinished(False, error)
481
482     return True
483
484
485 class DiskImport(_DiskImportExportBase):
486   MODE_TEXT = "import"
487
488   def __init__(self, lu, node_name, opts, instance,
489                dest, dest_args, timeouts, cbs, private=None):
490     """Initializes this class.
491
492     @param lu: Logical unit instance
493     @type node_name: string
494     @param node_name: Node name for import
495     @type opts: L{objects.ImportExportOptions}
496     @param opts: Import/export daemon options
497     @type instance: L{objects.Instance}
498     @param instance: Instance object
499     @param dest: I/O destination
500     @param dest_args: I/O arguments
501     @type timeouts: L{ImportExportTimeouts}
502     @param timeouts: Timeouts for this import
503     @type cbs: L{ImportExportCbBase}
504     @param cbs: Callbacks
505     @param private: Private data for callback functions
506
507     """
508     _DiskImportExportBase.__init__(self, lu, node_name, opts,
509                                    instance, timeouts, cbs, private)
510     self._dest = dest
511     self._dest_args = dest_args
512
513     # Timestamps
514     self._ts_listening = None
515
516   @property
517   def listen_port(self):
518     """Returns the port the daemon is listening on.
519
520     """
521     if self._daemon:
522       return self._daemon.listen_port
523
524     return None
525
526   def _StartDaemon(self):
527     """Starts the import daemon.
528
529     """
530     return self._lu.rpc.call_import_start(self.node_name, self._opts,
531                                           self._instance,
532                                           self._dest, self._dest_args)
533
534   def CheckListening(self):
535     """Checks whether the daemon is listening.
536
537     @rtype: bool
538     @return: Whether the daemon is listening
539
540     """
541     assert self._daemon, "Daemon status missing"
542
543     if self._ts_listening is not None:
544       return True
545
546     port = self._daemon.listen_port
547     if port is not None:
548       self._ts_listening = time.time()
549
550       logging.debug("Import %r on %s is now listening on port %s",
551                     self._daemon_name, self.node_name, port)
552
553       self._cbs.ReportListening(self, self._private)
554
555       return True
556
557     if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
558       raise _ImportExportError("Not listening after %s seconds" %
559                                self._timeouts.listen)
560
561     return False
562
563   def _GetConnectedCheckEpoch(self):
564     """Returns the time since we started listening.
565
566     """
567     assert self._ts_listening is not None, \
568            ("Checking whether an import is connected is only useful"
569             " once it's been listening")
570
571     return self._ts_listening
572
573
574 class DiskExport(_DiskImportExportBase):
575   MODE_TEXT = "export"
576
577   def __init__(self, lu, node_name, opts,
578                dest_host, dest_port, instance, source, source_args,
579                timeouts, cbs, private=None):
580     """Initializes this class.
581
582     @param lu: Logical unit instance
583     @type node_name: string
584     @param node_name: Node name for import
585     @type opts: L{objects.ImportExportOptions}
586     @param opts: Import/export daemon options
587     @type dest_host: string
588     @param dest_host: Destination host name or IP address
589     @type dest_port: number
590     @param dest_port: Destination port number
591     @type instance: L{objects.Instance}
592     @param instance: Instance object
593     @param source: I/O source
594     @param source_args: I/O source
595     @type timeouts: L{ImportExportTimeouts}
596     @param timeouts: Timeouts for this import
597     @type cbs: L{ImportExportCbBase}
598     @param cbs: Callbacks
599     @param private: Private data for callback functions
600
601     """
602     _DiskImportExportBase.__init__(self, lu, node_name, opts,
603                                    instance, timeouts, cbs, private)
604     self._dest_host = dest_host
605     self._dest_port = dest_port
606     self._source = source
607     self._source_args = source_args
608
609   def _StartDaemon(self):
610     """Starts the export daemon.
611
612     """
613     return self._lu.rpc.call_export_start(self.node_name, self._opts,
614                                           self._dest_host, self._dest_port,
615                                           self._instance, self._source,
616                                           self._source_args)
617
618   def CheckListening(self):
619     """Checks whether the daemon is listening.
620
621     """
622     # Only an import can be listening
623     return True
624
625   def _GetConnectedCheckEpoch(self):
626     """Returns the time since the daemon started.
627
628     """
629     assert self._ts_begin is not None
630
631     return self._ts_begin
632
633
634 def FormatProgress(progress):
635   """Formats progress information for user consumption
636
637   """
638   (mbytes, throughput, percent, eta) = progress
639
640   parts = [
641     utils.FormatUnit(mbytes, "h"),
642
643     # Not using FormatUnit as it doesn't support kilobytes
644     "%0.1f MiB/s" % throughput,
645     ]
646
647   if percent is not None:
648     parts.append("%d%%" % percent)
649
650   if eta is not None:
651     parts.append("ETA %s" % utils.FormatSeconds(eta))
652
653   return utils.CommaJoin(parts)
654
655
656 class ImportExportLoop:
657   MIN_DELAY = 1.0
658   MAX_DELAY = 20.0
659
660   def __init__(self, lu):
661     """Initializes this class.
662
663     """
664     self._lu = lu
665     self._queue = []
666     self._pending_add = []
667
668   def Add(self, diskie):
669     """Adds an import/export object to the loop.
670
671     @type diskie: Subclass of L{_DiskImportExportBase}
672     @param diskie: Import/export object
673
674     """
675     assert diskie not in self._pending_add
676     assert diskie.loop is None
677
678     diskie.SetLoop(self)
679
680     # Adding new objects to a staging list is necessary, otherwise the main
681     # loop gets confused if callbacks modify the queue while the main loop is
682     # iterating over it.
683     self._pending_add.append(diskie)
684
685   @staticmethod
686   def _CollectDaemonStatus(lu, daemons):
687     """Collects the status for all import/export daemons.
688
689     """
690     daemon_status = {}
691
692     for node_name, names in daemons.iteritems():
693       result = lu.rpc.call_impexp_status(node_name, names)
694       if result.fail_msg:
695         lu.LogWarning("Failed to get daemon status on %s: %s",
696                       node_name, result.fail_msg)
697         continue
698
699       assert len(names) == len(result.payload)
700
701       daemon_status[node_name] = dict(zip(names, result.payload))
702
703     return daemon_status
704
705   @staticmethod
706   def _GetActiveDaemonNames(queue):
707     """Gets the names of all active daemons.
708
709     """
710     result = {}
711     for diskie in queue:
712       if not diskie.active:
713         continue
714
715       try:
716         # Start daemon if necessary
717         daemon_name = diskie.CheckDaemon()
718       except _ImportExportError, err:
719         logging.exception("%s failed", diskie.MODE_TEXT)
720         diskie.Finalize(error=str(err))
721         continue
722
723       result.setdefault(diskie.node_name, []).append(daemon_name)
724
725     assert len(queue) >= len(result)
726     assert len(queue) >= sum([len(names) for names in result.itervalues()])
727
728     logging.debug("daemons=%r", result)
729
730     return result
731
732   def _AddPendingToQueue(self):
733     """Adds all pending import/export objects to the internal queue.
734
735     """
736     assert compat.all(diskie not in self._queue and diskie.loop == self
737                       for diskie in self._pending_add)
738
739     self._queue.extend(self._pending_add)
740
741     del self._pending_add[:]
742
743   def Run(self):
744     """Utility main loop.
745
746     """
747     while True:
748       self._AddPendingToQueue()
749
750       # Collect all active daemon names
751       daemons = self._GetActiveDaemonNames(self._queue)
752       if not daemons:
753         break
754
755       # Collection daemon status data
756       data = self._CollectDaemonStatus(self._lu, daemons)
757
758       # Use data
759       delay = self.MAX_DELAY
760       for diskie in self._queue:
761         if not diskie.active:
762           continue
763
764         try:
765           try:
766             all_daemon_data = data[diskie.node_name]
767           except KeyError:
768             result = diskie.SetDaemonData(False, None)
769           else:
770             result = \
771               diskie.SetDaemonData(True,
772                                    all_daemon_data[diskie.GetDaemonName()])
773
774           if not result:
775             # Daemon not yet ready, retry soon
776             delay = min(3.0, delay)
777             continue
778
779           if diskie.CheckFinished():
780             # Transfer finished
781             diskie.Finalize()
782             continue
783
784           # Normal case: check again in 5 seconds
785           delay = min(5.0, delay)
786
787           if not diskie.CheckListening():
788             # Not yet listening, retry soon
789             delay = min(1.0, delay)
790             continue
791
792           if not diskie.CheckConnected():
793             # Not yet connected, retry soon
794             delay = min(1.0, delay)
795             continue
796
797         except _ImportExportError, err:
798           logging.exception("%s failed", diskie.MODE_TEXT)
799           diskie.Finalize(error=str(err))
800
801       if not compat.any(diskie.active for diskie in self._queue):
802         break
803
804       # Wait a bit
805       delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
806       logging.debug("Waiting for %ss", delay)
807       time.sleep(delay)
808
809   def FinalizeAll(self):
810     """Finalizes all pending transfers.
811
812     """
813     success = True
814
815     for diskie in self._queue:
816       success = diskie.Finalize() and success
817
818     return success
819
820
821 class _TransferInstCbBase(ImportExportCbBase):
822   def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
823                dest_node, dest_ip):
824     """Initializes this class.
825
826     """
827     ImportExportCbBase.__init__(self)
828
829     self.lu = lu
830     self.feedback_fn = feedback_fn
831     self.instance = instance
832     self.timeouts = timeouts
833     self.src_node = src_node
834     self.src_cbs = src_cbs
835     self.dest_node = dest_node
836     self.dest_ip = dest_ip
837
838
839 class _TransferInstSourceCb(_TransferInstCbBase):
840   def ReportConnected(self, ie, dtp):
841     """Called when a connection has been established.
842
843     """
844     assert self.src_cbs is None
845     assert dtp.src_export == ie
846     assert dtp.dest_import
847
848     self.feedback_fn("%s is sending data on %s" %
849                      (dtp.data.name, ie.node_name))
850
851   def ReportProgress(self, ie, dtp):
852     """Called when new progress information should be reported.
853
854     """
855     progress = ie.progress
856     if not progress:
857       return
858
859     self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
860
861   def ReportFinished(self, ie, dtp):
862     """Called when a transfer has finished.
863
864     """
865     assert self.src_cbs is None
866     assert dtp.src_export == ie
867     assert dtp.dest_import
868
869     if ie.success:
870       self.feedback_fn("%s finished sending data" % dtp.data.name)
871     else:
872       self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
873                        (dtp.data.name, ie.final_message, ie.recent_output))
874
875     dtp.RecordResult(ie.success)
876
877     cb = dtp.data.finished_fn
878     if cb:
879       cb()
880
881     # TODO: Check whether sending SIGTERM right away is okay, maybe we should
882     # give the daemon a moment to sort things out
883     if dtp.dest_import and not ie.success:
884       dtp.dest_import.Abort()
885
886
887 class _TransferInstDestCb(_TransferInstCbBase):
888   def ReportListening(self, ie, dtp):
889     """Called when daemon started listening.
890
891     """
892     assert self.src_cbs
893     assert dtp.src_export is None
894     assert dtp.dest_import
895     assert dtp.export_opts
896
897     self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
898
899     # Start export on source node
900     de = DiskExport(self.lu, self.src_node, dtp.export_opts,
901                     self.dest_ip, ie.listen_port,
902                     self.instance, dtp.data.src_io, dtp.data.src_ioargs,
903                     self.timeouts, self.src_cbs, private=dtp)
904     ie.loop.Add(de)
905
906     dtp.src_export = de
907
908   def ReportConnected(self, ie, dtp):
909     """Called when a connection has been established.
910
911     """
912     self.feedback_fn("%s is receiving data on %s" %
913                      (dtp.data.name, self.dest_node))
914
915   def ReportFinished(self, ie, dtp):
916     """Called when a transfer has finished.
917
918     """
919     if ie.success:
920       self.feedback_fn("%s finished receiving data" % dtp.data.name)
921     else:
922       self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
923                        (dtp.data.name, ie.final_message, ie.recent_output))
924
925     dtp.RecordResult(ie.success)
926
927     # TODO: Check whether sending SIGTERM right away is okay, maybe we should
928     # give the daemon a moment to sort things out
929     if dtp.src_export and not ie.success:
930       dtp.src_export.Abort()
931
932
933 class DiskTransfer(object):
934   def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
935                finished_fn):
936     """Initializes this class.
937
938     @type name: string
939     @param name: User-visible name for this transfer (e.g. "disk/0")
940     @param src_io: Source I/O type
941     @param src_ioargs: Source I/O arguments
942     @param dest_io: Destination I/O type
943     @param dest_ioargs: Destination I/O arguments
944     @type finished_fn: callable
945     @param finished_fn: Function called once transfer has finished
946
947     """
948     self.name = name
949
950     self.src_io = src_io
951     self.src_ioargs = src_ioargs
952
953     self.dest_io = dest_io
954     self.dest_ioargs = dest_ioargs
955
956     self.finished_fn = finished_fn
957
958
959 class _DiskTransferPrivate(object):
960   def __init__(self, data, success, export_opts):
961     """Initializes this class.
962
963     @type data: L{DiskTransfer}
964     @type success: bool
965
966     """
967     self.data = data
968     self.success = success
969     self.export_opts = export_opts
970
971     self.src_export = None
972     self.dest_import = None
973
974   def RecordResult(self, success):
975     """Updates the status.
976
977     One failed part will cause the whole transfer to fail.
978
979     """
980     self.success = self.success and success
981
982
983 def _GetInstDiskMagic(base, instance_name, index):
984   """Computes the magic value for a disk export or import.
985
986   @type base: string
987   @param base: Random seed value (can be the same for all disks of a transfer)
988   @type instance_name: string
989   @param instance_name: Name of instance
990   @type index: number
991   @param index: Disk index
992
993   """
994   h = compat.sha1_hash()
995   h.update(str(constants.RIE_VERSION))
996   h.update(base)
997   h.update(instance_name)
998   h.update(str(index))
999   return h.hexdigest()
1000
1001
1002 def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
1003                          instance, all_transfers):
1004   """Transfers an instance's data from one node to another.
1005
1006   @param lu: Logical unit instance
1007   @param feedback_fn: Feedback function
1008   @type src_node: string
1009   @param src_node: Source node name
1010   @type dest_node: string
1011   @param dest_node: Destination node name
1012   @type dest_ip: string
1013   @param dest_ip: IP address of destination node
1014   @type instance: L{objects.Instance}
1015   @param instance: Instance object
1016   @type all_transfers: list of L{DiskTransfer} instances
1017   @param all_transfers: List of all disk transfers to be made
1018   @rtype: list
1019   @return: List with a boolean (True=successful, False=failed) for success for
1020            each transfer
1021
1022   """
1023   # Disable compression for all moves as these are all within the same cluster
1024   compress = constants.IEC_NONE
1025
1026   logging.debug("Source node %s, destination node %s, compression '%s'",
1027                 src_node, dest_node, compress)
1028
1029   timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1030   src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1031                                   src_node, None, dest_node, dest_ip)
1032   dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1033                                  src_node, src_cbs, dest_node, dest_ip)
1034
1035   all_dtp = []
1036
1037   base_magic = utils.GenerateSecret(6)
1038
1039   ieloop = ImportExportLoop(lu)
1040   try:
1041     for idx, transfer in enumerate(all_transfers):
1042       if transfer:
1043         feedback_fn("Exporting %s from %s to %s" %
1044                     (transfer.name, src_node, dest_node))
1045
1046         magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1047         opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1048                                            compress=compress, magic=magic)
1049
1050         dtp = _DiskTransferPrivate(transfer, True, opts)
1051
1052         di = DiskImport(lu, dest_node, opts, instance,
1053                         transfer.dest_io, transfer.dest_ioargs,
1054                         timeouts, dest_cbs, private=dtp)
1055         ieloop.Add(di)
1056
1057         dtp.dest_import = di
1058       else:
1059         dtp = _DiskTransferPrivate(None, False, None)
1060
1061       all_dtp.append(dtp)
1062
1063     ieloop.Run()
1064   finally:
1065     ieloop.FinalizeAll()
1066
1067   assert len(all_dtp) == len(all_transfers)
1068   assert compat.all((dtp.src_export is None or
1069                       dtp.src_export.success is not None) and
1070                      (dtp.dest_import is None or
1071                       dtp.dest_import.success is not None)
1072                      for dtp in all_dtp), \
1073          "Not all imports/exports are finalized"
1074
1075   return [bool(dtp.success) for dtp in all_dtp]
1076
1077
1078 class _RemoteExportCb(ImportExportCbBase):
1079   def __init__(self, feedback_fn, disk_count):
1080     """Initializes this class.
1081
1082     """
1083     ImportExportCbBase.__init__(self)
1084     self._feedback_fn = feedback_fn
1085     self._dresults = [None] * disk_count
1086
1087   @property
1088   def disk_results(self):
1089     """Returns per-disk results.
1090
1091     """
1092     return self._dresults
1093
1094   def ReportConnected(self, ie, private):
1095     """Called when a connection has been established.
1096
1097     """
1098     (idx, _) = private
1099
1100     self._feedback_fn("Disk %s is now sending data" % idx)
1101
1102   def ReportProgress(self, ie, private):
1103     """Called when new progress information should be reported.
1104
1105     """
1106     (idx, _) = private
1107
1108     progress = ie.progress
1109     if not progress:
1110       return
1111
1112     self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1113
1114   def ReportFinished(self, ie, private):
1115     """Called when a transfer has finished.
1116
1117     """
1118     (idx, finished_fn) = private
1119
1120     if ie.success:
1121       self._feedback_fn("Disk %s finished sending data" % idx)
1122     else:
1123       self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1124                         (idx, ie.final_message, ie.recent_output))
1125
1126     self._dresults[idx] = bool(ie.success)
1127
1128     if finished_fn:
1129       finished_fn()
1130
1131
1132 class ExportInstanceHelper:
1133   def __init__(self, lu, feedback_fn, instance):
1134     """Initializes this class.
1135
1136     @param lu: Logical unit instance
1137     @param feedback_fn: Feedback function
1138     @type instance: L{objects.Instance}
1139     @param instance: Instance object
1140
1141     """
1142     self._lu = lu
1143     self._feedback_fn = feedback_fn
1144     self._instance = instance
1145
1146     self._snap_disks = []
1147     self._removed_snaps = [False] * len(instance.disks)
1148
1149   def CreateSnapshots(self):
1150     """Creates an LVM snapshot for every disk of the instance.
1151
1152     """
1153     assert not self._snap_disks
1154
1155     instance = self._instance
1156     src_node = instance.primary_node
1157
1158     for idx, disk in enumerate(instance.disks):
1159       self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1160                         (idx, src_node))
1161
1162       # result.payload will be a snapshot of an lvm leaf of the one we
1163       # passed
1164       result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1165       new_dev = False
1166       msg = result.fail_msg
1167       if msg:
1168         self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1169                             idx, src_node, msg)
1170       elif (not isinstance(result.payload, (tuple, list)) or
1171             len(result.payload) != 2):
1172         self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1173                             " result '%s'", idx, src_node, result.payload)
1174       else:
1175         disk_id = tuple(result.payload)
1176         new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1177                                logical_id=disk_id, physical_id=disk_id,
1178                                iv_name=disk.iv_name)
1179
1180       self._snap_disks.append(new_dev)
1181
1182     assert len(self._snap_disks) == len(instance.disks)
1183     assert len(self._removed_snaps) == len(instance.disks)
1184
1185   def _RemoveSnapshot(self, disk_index):
1186     """Removes an LVM snapshot.
1187
1188     @type disk_index: number
1189     @param disk_index: Index of the snapshot to be removed
1190
1191     """
1192     disk = self._snap_disks[disk_index]
1193     if disk and not self._removed_snaps[disk_index]:
1194       src_node = self._instance.primary_node
1195
1196       self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1197                         (disk_index, src_node))
1198
1199       result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1200       if result.fail_msg:
1201         self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1202                             " %s: %s", disk_index, src_node, result.fail_msg)
1203       else:
1204         self._removed_snaps[disk_index] = True
1205
1206   def LocalExport(self, dest_node):
1207     """Intra-cluster instance export.
1208
1209     @type dest_node: L{objects.Node}
1210     @param dest_node: Destination node
1211
1212     """
1213     instance = self._instance
1214     src_node = instance.primary_node
1215
1216     assert len(self._snap_disks) == len(instance.disks)
1217
1218     transfers = []
1219
1220     for idx, dev in enumerate(self._snap_disks):
1221       if not dev:
1222         transfers.append(None)
1223         continue
1224
1225       path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1226                             dev.physical_id[1])
1227
1228       finished_fn = compat.partial(self._TransferFinished, idx)
1229
1230       # FIXME: pass debug option from opcode to backend
1231       dt = DiskTransfer("snapshot/%s" % idx,
1232                         constants.IEIO_SCRIPT, (dev, idx),
1233                         constants.IEIO_FILE, (path, ),
1234                         finished_fn)
1235       transfers.append(dt)
1236
1237     # Actually export data
1238     dresults = TransferInstanceData(self._lu, self._feedback_fn,
1239                                     src_node, dest_node.name,
1240                                     dest_node.secondary_ip,
1241                                     instance, transfers)
1242
1243     assert len(dresults) == len(instance.disks)
1244
1245     self._feedback_fn("Finalizing export on %s" % dest_node.name)
1246     result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1247                                                self._snap_disks)
1248     msg = result.fail_msg
1249     fin_resu = not msg
1250     if msg:
1251       self._lu.LogWarning("Could not finalize export for instance %s"
1252                           " on node %s: %s", instance.name, dest_node.name, msg)
1253
1254     return (fin_resu, dresults)
1255
1256   def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1257     """Inter-cluster instance export.
1258
1259     @type disk_info: list
1260     @param disk_info: Per-disk destination information
1261     @type key_name: string
1262     @param key_name: Name of X509 key to use
1263     @type dest_ca_pem: string
1264     @param dest_ca_pem: Destination X509 CA in PEM format
1265     @type timeouts: L{ImportExportTimeouts}
1266     @param timeouts: Timeouts for this import
1267
1268     """
1269     instance = self._instance
1270
1271     assert len(disk_info) == len(instance.disks)
1272
1273     cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1274
1275     ieloop = ImportExportLoop(self._lu)
1276     try:
1277       for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1278                                                            disk_info)):
1279         # Decide whether to use IPv6
1280         ipv6 = netutils.IP6Address.IsValid(host)
1281
1282         opts = objects.ImportExportOptions(key_name=key_name,
1283                                            ca_pem=dest_ca_pem,
1284                                            magic=magic, ipv6=ipv6)
1285
1286         self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1287         finished_fn = compat.partial(self._TransferFinished, idx)
1288         ieloop.Add(DiskExport(self._lu, instance.primary_node,
1289                               opts, host, port, instance,
1290                               constants.IEIO_SCRIPT, (dev, idx),
1291                               timeouts, cbs, private=(idx, finished_fn)))
1292
1293       ieloop.Run()
1294     finally:
1295       ieloop.FinalizeAll()
1296
1297     return (True, cbs.disk_results)
1298
1299   def _TransferFinished(self, idx):
1300     """Called once a transfer has finished.
1301
1302     @type idx: number
1303     @param idx: Disk index
1304
1305     """
1306     logging.debug("Transfer %s finished", idx)
1307     self._RemoveSnapshot(idx)
1308
1309   def Cleanup(self):
1310     """Remove all snapshots.
1311
1312     """
1313     assert len(self._removed_snaps) == len(self._instance.disks)
1314     for idx in range(len(self._instance.disks)):
1315       self._RemoveSnapshot(idx)
1316
1317
1318 class _RemoteImportCb(ImportExportCbBase):
1319   def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1320                external_address):
1321     """Initializes this class.
1322
1323     @type cds: string
1324     @param cds: Cluster domain secret
1325     @type x509_cert_pem: string
1326     @param x509_cert_pem: CA used for signing import key
1327     @type disk_count: number
1328     @param disk_count: Number of disks
1329     @type external_address: string
1330     @param external_address: External address of destination node
1331
1332     """
1333     ImportExportCbBase.__init__(self)
1334     self._feedback_fn = feedback_fn
1335     self._cds = cds
1336     self._x509_cert_pem = x509_cert_pem
1337     self._disk_count = disk_count
1338     self._external_address = external_address
1339
1340     self._dresults = [None] * disk_count
1341     self._daemon_port = [None] * disk_count
1342
1343     self._salt = utils.GenerateSecret(8)
1344
1345   @property
1346   def disk_results(self):
1347     """Returns per-disk results.
1348
1349     """
1350     return self._dresults
1351
1352   def _CheckAllListening(self):
1353     """Checks whether all daemons are listening.
1354
1355     If all daemons are listening, the information is sent to the client.
1356
1357     """
1358     if not compat.all(dp is not None for dp in self._daemon_port):
1359       return
1360
1361     host = self._external_address
1362
1363     disks = []
1364     for idx, (port, magic) in enumerate(self._daemon_port):
1365       disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1366                                                idx, host, port, magic))
1367
1368     assert len(disks) == self._disk_count
1369
1370     self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1371       "disks": disks,
1372       "x509_ca": self._x509_cert_pem,
1373       })
1374
1375   def ReportListening(self, ie, private):
1376     """Called when daemon started listening.
1377
1378     """
1379     (idx, ) = private
1380
1381     self._feedback_fn("Disk %s is now listening" % idx)
1382
1383     assert self._daemon_port[idx] is None
1384
1385     self._daemon_port[idx] = (ie.listen_port, ie.magic)
1386
1387     self._CheckAllListening()
1388
1389   def ReportConnected(self, ie, private):
1390     """Called when a connection has been established.
1391
1392     """
1393     (idx, ) = private
1394
1395     self._feedback_fn("Disk %s is now receiving data" % idx)
1396
1397   def ReportFinished(self, ie, private):
1398     """Called when a transfer has finished.
1399
1400     """
1401     (idx, ) = private
1402
1403     # Daemon is certainly no longer listening
1404     self._daemon_port[idx] = None
1405
1406     if ie.success:
1407       self._feedback_fn("Disk %s finished receiving data" % idx)
1408     else:
1409       self._feedback_fn(("Disk %s failed to receive data: %s"
1410                          " (recent output: %s)") %
1411                         (idx, ie.final_message, ie.recent_output))
1412
1413     self._dresults[idx] = bool(ie.success)
1414
1415
1416 def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1417                  cds, timeouts):
1418   """Imports an instance from another cluster.
1419
1420   @param lu: Logical unit instance
1421   @param feedback_fn: Feedback function
1422   @type instance: L{objects.Instance}
1423   @param instance: Instance object
1424   @type pnode: L{objects.Node}
1425   @param pnode: Primary node of instance as an object
1426   @type source_x509_ca: OpenSSL.crypto.X509
1427   @param source_x509_ca: Import source's X509 CA
1428   @type cds: string
1429   @param cds: Cluster domain secret
1430   @type timeouts: L{ImportExportTimeouts}
1431   @param timeouts: Timeouts for this import
1432
1433   """
1434   source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1435                                                   source_x509_ca)
1436
1437   magic_base = utils.GenerateSecret(6)
1438
1439   # Decide whether to use IPv6
1440   ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1441
1442   # Create crypto key
1443   result = lu.rpc.call_x509_cert_create(instance.primary_node,
1444                                         constants.RIE_CERT_VALIDITY)
1445   result.Raise("Can't create X509 key and certificate on %s" % result.node)
1446
1447   (x509_key_name, x509_cert_pem) = result.payload
1448   try:
1449     # Load certificate
1450     x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1451                                                 x509_cert_pem)
1452
1453     # Sign certificate
1454     signed_x509_cert_pem = \
1455       utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1456
1457     cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1458                           len(instance.disks), pnode.primary_ip)
1459
1460     ieloop = ImportExportLoop(lu)
1461     try:
1462       for idx, dev in enumerate(instance.disks):
1463         magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1464
1465         # Import daemon options
1466         opts = objects.ImportExportOptions(key_name=x509_key_name,
1467                                            ca_pem=source_ca_pem,
1468                                            magic=magic, ipv6=ipv6)
1469
1470         ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1471                               constants.IEIO_SCRIPT, (dev, idx),
1472                               timeouts, cbs, private=(idx, )))
1473
1474       ieloop.Run()
1475     finally:
1476       ieloop.FinalizeAll()
1477   finally:
1478     # Remove crypto key and certificate
1479     result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1480     result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1481
1482   return cbs.disk_results
1483
1484
1485 def _GetImportExportHandshakeMessage(version):
1486   """Returns the handshake message for a RIE protocol version.
1487
1488   @type version: number
1489
1490   """
1491   return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1492
1493
1494 def ComputeRemoteExportHandshake(cds):
1495   """Computes the remote import/export handshake.
1496
1497   @type cds: string
1498   @param cds: Cluster domain secret
1499
1500   """
1501   salt = utils.GenerateSecret(8)
1502   msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1503   return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1504
1505
1506 def CheckRemoteExportHandshake(cds, handshake):
1507   """Checks the handshake of a remote import/export.
1508
1509   @type cds: string
1510   @param cds: Cluster domain secret
1511   @type handshake: sequence
1512   @param handshake: Handshake sent by remote peer
1513
1514   """
1515   try:
1516     (version, hmac_digest, hmac_salt) = handshake
1517   except (TypeError, ValueError), err:
1518     return "Invalid data: %s" % err
1519
1520   if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1521                               hmac_digest, salt=hmac_salt):
1522     return "Hash didn't match, clusters don't share the same domain secret"
1523
1524   if version != constants.RIE_VERSION:
1525     return ("Clusters don't have the same remote import/export protocol"
1526             " (local=%s, remote=%s)" %
1527             (constants.RIE_VERSION, version))
1528
1529   return None
1530
1531
1532 def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1533   """Returns the hashed text for import/export disk information.
1534
1535   @type disk_index: number
1536   @param disk_index: Index of disk (included in hash)
1537   @type host: string
1538   @param host: Hostname
1539   @type port: number
1540   @param port: Daemon port
1541   @type magic: string
1542   @param magic: Magic value
1543
1544   """
1545   return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1546
1547
1548 def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1549   """Verifies received disk information for an export.
1550
1551   @type cds: string
1552   @param cds: Cluster domain secret
1553   @type disk_index: number
1554   @param disk_index: Index of disk (included in hash)
1555   @type disk_info: sequence
1556   @param disk_info: Disk information sent by remote peer
1557
1558   """
1559   try:
1560     (host, port, magic, hmac_digest, hmac_salt) = disk_info
1561   except (TypeError, ValueError), err:
1562     raise errors.GenericError("Invalid data: %s" % err)
1563
1564   if not (host and port and magic):
1565     raise errors.GenericError("Missing destination host, port or magic")
1566
1567   msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1568
1569   if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1570     raise errors.GenericError("HMAC is wrong")
1571
1572   if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1573     destination = host
1574   else:
1575     destination = netutils.Hostname.GetNormalizedName(host)
1576
1577   return (destination,
1578           utils.ValidateServiceName(port),
1579           magic)
1580
1581
1582 def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1583   """Computes the signed disk information for a remote import.
1584
1585   @type cds: string
1586   @param cds: Cluster domain secret
1587   @type salt: string
1588   @param salt: HMAC salt
1589   @type disk_index: number
1590   @param disk_index: Index of disk (included in hash)
1591   @type host: string
1592   @param host: Hostname
1593   @type port: number
1594   @param port: Daemon port
1595   @type magic: string
1596   @param magic: Magic value
1597
1598   """
1599   msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1600   hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1601   return (host, port, magic, hmac_digest, salt)