Fix parameter names in SimpleFillBE/NIC docstrings
[ganeti-local] / lib / masterd / instance.py
1 #
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Instance-related functions and classes for masterd.
23
24 """
25
26 import logging
27 import time
28 import OpenSSL
29
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import compat
33 from ganeti import utils
34 from ganeti import objects
35
36
37 class _ImportExportError(Exception):
38   """Local exception to report import/export errors.
39
40   """
41
42
43 class ImportExportTimeouts(object):
44   #: Time until daemon starts writing status file
45   DEFAULT_READY_TIMEOUT = 10
46
47   #: Length of time until errors cause hard failure
48   DEFAULT_ERROR_TIMEOUT = 10
49
50   #: Time after which daemon must be listening
51   DEFAULT_LISTEN_TIMEOUT = 10
52
53   #: Progress update interval
54   DEFAULT_PROGRESS_INTERVAL = 60
55
56   __slots__ = [
57     "error",
58     "ready",
59     "listen",
60     "connect",
61     "progress",
62     ]
63
64   def __init__(self, connect,
65                listen=DEFAULT_LISTEN_TIMEOUT,
66                error=DEFAULT_ERROR_TIMEOUT,
67                ready=DEFAULT_READY_TIMEOUT,
68                progress=DEFAULT_PROGRESS_INTERVAL):
69     """Initializes this class.
70
71     @type connect: number
72     @param connect: Timeout for establishing connection
73     @type listen: number
74     @param listen: Timeout for starting to listen for connections
75     @type error: number
76     @param error: Length of time until errors cause hard failure
77     @type ready: number
78     @param ready: Timeout for daemon to become ready
79     @type progress: number
80     @param progress: Progress update interval
81
82     """
83     self.error = error
84     self.ready = ready
85     self.listen = listen
86     self.connect = connect
87     self.progress = progress
88
89
90 class ImportExportCbBase(object):
91   """Callbacks for disk import/export.
92
93   """
94   def ReportListening(self, ie, private):
95     """Called when daemon started listening.
96
97     @type ie: Subclass of L{_DiskImportExportBase}
98     @param ie: Import/export object
99     @param private: Private data passed to import/export object
100
101     """
102
103   def ReportConnected(self, ie, private):
104     """Called when a connection has been established.
105
106     @type ie: Subclass of L{_DiskImportExportBase}
107     @param ie: Import/export object
108     @param private: Private data passed to import/export object
109
110     """
111
112   def ReportProgress(self, ie, private):
113     """Called when new progress information should be reported.
114
115     @type ie: Subclass of L{_DiskImportExportBase}
116     @param ie: Import/export object
117     @param private: Private data passed to import/export object
118
119     """
120
121   def ReportFinished(self, ie, private):
122     """Called when a transfer has finished.
123
124     @type ie: Subclass of L{_DiskImportExportBase}
125     @param ie: Import/export object
126     @param private: Private data passed to import/export object
127
128     """
129
130
131 def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
132   """Checks whether a timeout has expired.
133
134   """
135   return _time_fn() > (epoch + timeout)
136
137
138 class _DiskImportExportBase(object):
139   MODE_TEXT = None
140
141   def __init__(self, lu, node_name, opts,
142                instance, timeouts, cbs, private=None):
143     """Initializes this class.
144
145     @param lu: Logical unit instance
146     @type node_name: string
147     @param node_name: Node name for import
148     @type opts: L{objects.ImportExportOptions}
149     @param opts: Import/export daemon options
150     @type instance: L{objects.Instance}
151     @param instance: Instance object
152     @type timeouts: L{ImportExportTimeouts}
153     @param timeouts: Timeouts for this import
154     @type cbs: L{ImportExportCbBase}
155     @param cbs: Callbacks
156     @param private: Private data for callback functions
157
158     """
159     assert self.MODE_TEXT
160
161     self._lu = lu
162     self.node_name = node_name
163     self._opts = opts
164     self._instance = instance
165     self._timeouts = timeouts
166     self._cbs = cbs
167     self._private = private
168
169     # Parent loop
170     self._loop = None
171
172     # Timestamps
173     self._ts_begin = None
174     self._ts_connected = None
175     self._ts_finished = None
176     self._ts_cleanup = None
177     self._ts_last_progress = None
178     self._ts_last_error = None
179
180     # Transfer status
181     self.success = None
182     self.final_message = None
183
184     # Daemon status
185     self._daemon_name = None
186     self._daemon = None
187
188   @property
189   def recent_output(self):
190     """Returns the most recent output from the daemon.
191
192     """
193     if self._daemon:
194       return self._daemon.recent_output
195
196     return None
197
198   @property
199   def progress(self):
200     """Returns transfer progress information.
201
202     """
203     if not self._daemon:
204       return None
205
206     return (self._daemon.progress_mbytes,
207             self._daemon.progress_throughput,
208             self._daemon.progress_percent,
209             self._daemon.progress_eta)
210
211   @property
212   def magic(self):
213     """Returns the magic value for this import/export.
214
215     """
216     return self._opts.magic
217
218   @property
219   def active(self):
220     """Determines whether this transport is still active.
221
222     """
223     return self.success is None
224
225   @property
226   def loop(self):
227     """Returns parent loop.
228
229     @rtype: L{ImportExportLoop}
230
231     """
232     return self._loop
233
234   def SetLoop(self, loop):
235     """Sets the parent loop.
236
237     @type loop: L{ImportExportLoop}
238
239     """
240     if self._loop:
241       raise errors.ProgrammerError("Loop can only be set once")
242
243     self._loop = loop
244
245   def _StartDaemon(self):
246     """Starts the import/export daemon.
247
248     """
249     raise NotImplementedError()
250
251   def CheckDaemon(self):
252     """Checks whether daemon has been started and if not, starts it.
253
254     @rtype: string
255     @return: Daemon name
256
257     """
258     assert self._ts_cleanup is None
259
260     if self._daemon_name is None:
261       assert self._ts_begin is None
262
263       result = self._StartDaemon()
264       if result.fail_msg:
265         raise _ImportExportError("Failed to start %s on %s: %s" %
266                                  (self.MODE_TEXT, self.node_name,
267                                   result.fail_msg))
268
269       daemon_name = result.payload
270
271       logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
272                    self.node_name)
273
274       self._ts_begin = time.time()
275       self._daemon_name = daemon_name
276
277     return self._daemon_name
278
279   def GetDaemonName(self):
280     """Returns the daemon name.
281
282     """
283     assert self._daemon_name, "Daemon has not been started"
284     assert self._ts_cleanup is None
285     return self._daemon_name
286
287   def Abort(self):
288     """Sends SIGTERM to import/export daemon (if still active).
289
290     """
291     if self._daemon_name:
292       self._lu.LogWarning("Aborting %s %r on %s",
293                           self.MODE_TEXT, self._daemon_name, self.node_name)
294       result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
295       if result.fail_msg:
296         self._lu.LogWarning("Failed to abort %s %r on %s: %s",
297                             self.MODE_TEXT, self._daemon_name,
298                             self.node_name, result.fail_msg)
299         return False
300
301     return True
302
303   def _SetDaemonData(self, data):
304     """Internal function for updating status daemon data.
305
306     @type data: L{objects.ImportExportStatus}
307     @param data: Daemon status data
308
309     """
310     assert self._ts_begin is not None
311
312     if not data:
313       if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
314         raise _ImportExportError("Didn't become ready after %s seconds" %
315                                  self._timeouts.ready)
316
317       return False
318
319     self._daemon = data
320
321     return True
322
323   def SetDaemonData(self, success, data):
324     """Updates daemon status data.
325
326     @type success: bool
327     @param success: Whether fetching data was successful or not
328     @type data: L{objects.ImportExportStatus}
329     @param data: Daemon status data
330
331     """
332     if not success:
333       if self._ts_last_error is None:
334         self._ts_last_error = time.time()
335
336       elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
337         raise _ImportExportError("Too many errors while updating data")
338
339       return False
340
341     self._ts_last_error = None
342
343     return self._SetDaemonData(data)
344
345   def CheckListening(self):
346     """Checks whether the daemon is listening.
347
348     """
349     raise NotImplementedError()
350
351   def _GetConnectedCheckEpoch(self):
352     """Returns timeout to calculate connect timeout.
353
354     """
355     raise NotImplementedError()
356
357   def CheckConnected(self):
358     """Checks whether the daemon is connected.
359
360     @rtype: bool
361     @return: Whether the daemon is connected
362
363     """
364     assert self._daemon, "Daemon status missing"
365
366     if self._ts_connected is not None:
367       return True
368
369     if self._daemon.connected:
370       self._ts_connected = time.time()
371
372       # TODO: Log remote peer
373       logging.debug("%s %r on %s is now connected",
374                     self.MODE_TEXT, self._daemon_name, self.node_name)
375
376       self._cbs.ReportConnected(self, self._private)
377
378       return True
379
380     if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
381       raise _ImportExportError("Not connected after %s seconds" %
382                                self._timeouts.connect)
383
384     return False
385
386   def _CheckProgress(self):
387     """Checks whether a progress update should be reported.
388
389     """
390     if ((self._ts_last_progress is None or
391          _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
392         self._daemon and
393         self._daemon.progress_mbytes is not None and
394         self._daemon.progress_throughput is not None):
395       self._cbs.ReportProgress(self, self._private)
396       self._ts_last_progress = time.time()
397
398   def CheckFinished(self):
399     """Checks whether the daemon exited.
400
401     @rtype: bool
402     @return: Whether the transfer is finished
403
404     """
405     assert self._daemon, "Daemon status missing"
406
407     if self._ts_finished:
408       return True
409
410     if self._daemon.exit_status is None:
411       # TODO: Adjust delay for ETA expiring soon
412       self._CheckProgress()
413       return False
414
415     self._ts_finished = time.time()
416
417     self._ReportFinished(self._daemon.exit_status == 0,
418                          self._daemon.error_message)
419
420     return True
421
422   def _ReportFinished(self, success, message):
423     """Transfer is finished or daemon exited.
424
425     @type success: bool
426     @param success: Whether the transfer was successful
427     @type message: string
428     @param message: Error message
429
430     """
431     assert self.success is None
432
433     self.success = success
434     self.final_message = message
435
436     if success:
437       logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
438                    self.node_name)
439     elif self._daemon_name:
440       self._lu.LogWarning("%s %r on %s failed: %s",
441                           self.MODE_TEXT, self._daemon_name, self.node_name,
442                           message)
443     else:
444       self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
445                           self.node_name, message)
446
447     self._cbs.ReportFinished(self, self._private)
448
449   def _Finalize(self):
450     """Makes the RPC call to finalize this import/export.
451
452     """
453     return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
454
455   def Finalize(self, error=None):
456     """Finalizes this import/export.
457
458     """
459     if self._daemon_name:
460       logging.info("Finalizing %s %r on %s",
461                    self.MODE_TEXT, self._daemon_name, self.node_name)
462
463       result = self._Finalize()
464       if result.fail_msg:
465         self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
466                             self.MODE_TEXT, self._daemon_name,
467                             self.node_name, result.fail_msg)
468         return False
469
470       # Daemon is no longer running
471       self._daemon_name = None
472       self._ts_cleanup = time.time()
473
474     if error:
475       self._ReportFinished(False, error)
476
477     return True
478
479
480 class DiskImport(_DiskImportExportBase):
481   MODE_TEXT = "import"
482
483   def __init__(self, lu, node_name, opts, instance,
484                dest, dest_args, timeouts, cbs, private=None):
485     """Initializes this class.
486
487     @param lu: Logical unit instance
488     @type node_name: string
489     @param node_name: Node name for import
490     @type opts: L{objects.ImportExportOptions}
491     @param opts: Import/export daemon options
492     @type instance: L{objects.Instance}
493     @param instance: Instance object
494     @param dest: I/O destination
495     @param dest_args: I/O arguments
496     @type timeouts: L{ImportExportTimeouts}
497     @param timeouts: Timeouts for this import
498     @type cbs: L{ImportExportCbBase}
499     @param cbs: Callbacks
500     @param private: Private data for callback functions
501
502     """
503     _DiskImportExportBase.__init__(self, lu, node_name, opts,
504                                    instance, timeouts, cbs, private)
505     self._dest = dest
506     self._dest_args = dest_args
507
508     # Timestamps
509     self._ts_listening = None
510
511   @property
512   def listen_port(self):
513     """Returns the port the daemon is listening on.
514
515     """
516     if self._daemon:
517       return self._daemon.listen_port
518
519     return None
520
521   def _StartDaemon(self):
522     """Starts the import daemon.
523
524     """
525     return self._lu.rpc.call_import_start(self.node_name, self._opts,
526                                           self._instance,
527                                           self._dest, self._dest_args)
528
529   def CheckListening(self):
530     """Checks whether the daemon is listening.
531
532     @rtype: bool
533     @return: Whether the daemon is listening
534
535     """
536     assert self._daemon, "Daemon status missing"
537
538     if self._ts_listening is not None:
539       return True
540
541     port = self._daemon.listen_port
542     if port is not None:
543       self._ts_listening = time.time()
544
545       logging.debug("Import %r on %s is now listening on port %s",
546                     self._daemon_name, self.node_name, port)
547
548       self._cbs.ReportListening(self, self._private)
549
550       return True
551
552     if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
553       raise _ImportExportError("Not listening after %s seconds" %
554                                self._timeouts.listen)
555
556     return False
557
558   def _GetConnectedCheckEpoch(self):
559     """Returns the time since we started listening.
560
561     """
562     assert self._ts_listening is not None, \
563            ("Checking whether an import is connected is only useful"
564             " once it's been listening")
565
566     return self._ts_listening
567
568
569 class DiskExport(_DiskImportExportBase):
570   MODE_TEXT = "export"
571
572   def __init__(self, lu, node_name, opts,
573                dest_host, dest_port, instance, source, source_args,
574                timeouts, cbs, private=None):
575     """Initializes this class.
576
577     @param lu: Logical unit instance
578     @type node_name: string
579     @param node_name: Node name for import
580     @type opts: L{objects.ImportExportOptions}
581     @param opts: Import/export daemon options
582     @type dest_host: string
583     @param dest_host: Destination host name or IP address
584     @type dest_port: number
585     @param dest_port: Destination port number
586     @type instance: L{objects.Instance}
587     @param instance: Instance object
588     @param source: I/O source
589     @param source_args: I/O source
590     @type timeouts: L{ImportExportTimeouts}
591     @param timeouts: Timeouts for this import
592     @type cbs: L{ImportExportCbBase}
593     @param cbs: Callbacks
594     @param private: Private data for callback functions
595
596     """
597     _DiskImportExportBase.__init__(self, lu, node_name, opts,
598                                    instance, timeouts, cbs, private)
599     self._dest_host = dest_host
600     self._dest_port = dest_port
601     self._source = source
602     self._source_args = source_args
603
604   def _StartDaemon(self):
605     """Starts the export daemon.
606
607     """
608     return self._lu.rpc.call_export_start(self.node_name, self._opts,
609                                           self._dest_host, self._dest_port,
610                                           self._instance, self._source,
611                                           self._source_args)
612
613   def CheckListening(self):
614     """Checks whether the daemon is listening.
615
616     """
617     # Only an import can be listening
618     return True
619
620   def _GetConnectedCheckEpoch(self):
621     """Returns the time since the daemon started.
622
623     """
624     assert self._ts_begin is not None
625
626     return self._ts_begin
627
628
629 def FormatProgress(progress):
630   """Formats progress information for user consumption
631
632   """
633   (mbytes, throughput, percent, eta) = progress
634
635   parts = [
636     utils.FormatUnit(mbytes, "h"),
637
638     # Not using FormatUnit as it doesn't support kilobytes
639     "%0.1f MiB/s" % throughput,
640     ]
641
642   if percent is not None:
643     parts.append("%d%%" % percent)
644
645   if eta is not None:
646     parts.append("ETA %s" % utils.FormatSeconds(eta))
647
648   return utils.CommaJoin(parts)
649
650
651 class ImportExportLoop:
652   MIN_DELAY = 1.0
653   MAX_DELAY = 20.0
654
655   def __init__(self, lu):
656     """Initializes this class.
657
658     """
659     self._lu = lu
660     self._queue = []
661     self._pending_add = []
662
663   def Add(self, diskie):
664     """Adds an import/export object to the loop.
665
666     @type diskie: Subclass of L{_DiskImportExportBase}
667     @param diskie: Import/export object
668
669     """
670     assert diskie not in self._pending_add
671     assert diskie.loop is None
672
673     diskie.SetLoop(self)
674
675     # Adding new objects to a staging list is necessary, otherwise the main
676     # loop gets confused if callbacks modify the queue while the main loop is
677     # iterating over it.
678     self._pending_add.append(diskie)
679
680   @staticmethod
681   def _CollectDaemonStatus(lu, daemons):
682     """Collects the status for all import/export daemons.
683
684     """
685     daemon_status = {}
686
687     for node_name, names in daemons.iteritems():
688       result = lu.rpc.call_impexp_status(node_name, names)
689       if result.fail_msg:
690         lu.LogWarning("Failed to get daemon status on %s: %s",
691                       node_name, result.fail_msg)
692         continue
693
694       assert len(names) == len(result.payload)
695
696       daemon_status[node_name] = dict(zip(names, result.payload))
697
698     return daemon_status
699
700   @staticmethod
701   def _GetActiveDaemonNames(queue):
702     """Gets the names of all active daemons.
703
704     """
705     result = {}
706     for diskie in queue:
707       if not diskie.active:
708         continue
709
710       try:
711         # Start daemon if necessary
712         daemon_name = diskie.CheckDaemon()
713       except _ImportExportError, err:
714         logging.exception("%s failed", diskie.MODE_TEXT)
715         diskie.Finalize(error=str(err))
716         continue
717
718       result.setdefault(diskie.node_name, []).append(daemon_name)
719
720     assert len(queue) >= len(result)
721     assert len(queue) >= sum([len(names) for names in result.itervalues()])
722
723     logging.debug("daemons=%r", result)
724
725     return result
726
727   def _AddPendingToQueue(self):
728     """Adds all pending import/export objects to the internal queue.
729
730     """
731     assert compat.all(diskie not in self._queue and diskie.loop == self
732                       for diskie in self._pending_add)
733
734     self._queue.extend(self._pending_add)
735
736     del self._pending_add[:]
737
738   def Run(self):
739     """Utility main loop.
740
741     """
742     while True:
743       self._AddPendingToQueue()
744
745       # Collect all active daemon names
746       daemons = self._GetActiveDaemonNames(self._queue)
747       if not daemons:
748         break
749
750       # Collection daemon status data
751       data = self._CollectDaemonStatus(self._lu, daemons)
752
753       # Use data
754       delay = self.MAX_DELAY
755       for diskie in self._queue:
756         if not diskie.active:
757           continue
758
759         try:
760           try:
761             all_daemon_data = data[diskie.node_name]
762           except KeyError:
763             result = diskie.SetDaemonData(False, None)
764           else:
765             result = \
766               diskie.SetDaemonData(True,
767                                    all_daemon_data[diskie.GetDaemonName()])
768
769           if not result:
770             # Daemon not yet ready, retry soon
771             delay = min(3.0, delay)
772             continue
773
774           if diskie.CheckFinished():
775             # Transfer finished
776             diskie.Finalize()
777             continue
778
779           # Normal case: check again in 5 seconds
780           delay = min(5.0, delay)
781
782           if not diskie.CheckListening():
783             # Not yet listening, retry soon
784             delay = min(1.0, delay)
785             continue
786
787           if not diskie.CheckConnected():
788             # Not yet connected, retry soon
789             delay = min(1.0, delay)
790             continue
791
792         except _ImportExportError, err:
793           logging.exception("%s failed", diskie.MODE_TEXT)
794           diskie.Finalize(error=str(err))
795
796       if not compat.any([diskie.active for diskie in self._queue]):
797         break
798
799       # Wait a bit
800       delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
801       logging.debug("Waiting for %ss", delay)
802       time.sleep(delay)
803
804   def FinalizeAll(self):
805     """Finalizes all pending transfers.
806
807     """
808     success = True
809
810     for diskie in self._queue:
811       success = diskie.Finalize() and success
812
813     return success
814
815
816 class _TransferInstCbBase(ImportExportCbBase):
817   def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
818                dest_node, dest_ip):
819     """Initializes this class.
820
821     """
822     ImportExportCbBase.__init__(self)
823
824     self.lu = lu
825     self.feedback_fn = feedback_fn
826     self.instance = instance
827     self.timeouts = timeouts
828     self.src_node = src_node
829     self.src_cbs = src_cbs
830     self.dest_node = dest_node
831     self.dest_ip = dest_ip
832
833
834 class _TransferInstSourceCb(_TransferInstCbBase):
835   def ReportConnected(self, ie, dtp):
836     """Called when a connection has been established.
837
838     """
839     assert self.src_cbs is None
840     assert dtp.src_export == ie
841     assert dtp.dest_import
842
843     self.feedback_fn("%s is sending data on %s" %
844                      (dtp.data.name, ie.node_name))
845
846   def ReportProgress(self, ie, dtp):
847     """Called when new progress information should be reported.
848
849     """
850     progress = ie.progress
851     if not progress:
852       return
853
854     self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
855
856   def ReportFinished(self, ie, dtp):
857     """Called when a transfer has finished.
858
859     """
860     assert self.src_cbs is None
861     assert dtp.src_export == ie
862     assert dtp.dest_import
863
864     if ie.success:
865       self.feedback_fn("%s finished sending data" % dtp.data.name)
866     else:
867       self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
868                        (dtp.data.name, ie.final_message, ie.recent_output))
869
870     dtp.RecordResult(ie.success)
871
872     cb = dtp.data.finished_fn
873     if cb:
874       cb()
875
876     # TODO: Check whether sending SIGTERM right away is okay, maybe we should
877     # give the daemon a moment to sort things out
878     if dtp.dest_import and not ie.success:
879       dtp.dest_import.Abort()
880
881
882 class _TransferInstDestCb(_TransferInstCbBase):
883   def ReportListening(self, ie, dtp):
884     """Called when daemon started listening.
885
886     """
887     assert self.src_cbs
888     assert dtp.src_export is None
889     assert dtp.dest_import
890     assert dtp.export_opts
891
892     self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
893
894     # Start export on source node
895     de = DiskExport(self.lu, self.src_node, dtp.export_opts,
896                     self.dest_ip, ie.listen_port,
897                     self.instance, dtp.data.src_io, dtp.data.src_ioargs,
898                     self.timeouts, self.src_cbs, private=dtp)
899     ie.loop.Add(de)
900
901     dtp.src_export = de
902
903   def ReportConnected(self, ie, dtp):
904     """Called when a connection has been established.
905
906     """
907     self.feedback_fn("%s is receiving data on %s" %
908                      (dtp.data.name, self.dest_node))
909
910   def ReportFinished(self, ie, dtp):
911     """Called when a transfer has finished.
912
913     """
914     if ie.success:
915       self.feedback_fn("%s finished receiving data" % dtp.data.name)
916     else:
917       self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
918                        (dtp.data.name, ie.final_message, ie.recent_output))
919
920     dtp.RecordResult(ie.success)
921
922     # TODO: Check whether sending SIGTERM right away is okay, maybe we should
923     # give the daemon a moment to sort things out
924     if dtp.src_export and not ie.success:
925       dtp.src_export.Abort()
926
927
928 class DiskTransfer(object):
929   def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
930                finished_fn):
931     """Initializes this class.
932
933     @type name: string
934     @param name: User-visible name for this transfer (e.g. "disk/0")
935     @param src_io: Source I/O type
936     @param src_ioargs: Source I/O arguments
937     @param dest_io: Destination I/O type
938     @param dest_ioargs: Destination I/O arguments
939     @type finished_fn: callable
940     @param finished_fn: Function called once transfer has finished
941
942     """
943     self.name = name
944
945     self.src_io = src_io
946     self.src_ioargs = src_ioargs
947
948     self.dest_io = dest_io
949     self.dest_ioargs = dest_ioargs
950
951     self.finished_fn = finished_fn
952
953
954 class _DiskTransferPrivate(object):
955   def __init__(self, data, success, export_opts):
956     """Initializes this class.
957
958     @type data: L{DiskTransfer}
959     @type success: bool
960
961     """
962     self.data = data
963     self.success = success
964     self.export_opts = export_opts
965
966     self.src_export = None
967     self.dest_import = None
968
969   def RecordResult(self, success):
970     """Updates the status.
971
972     One failed part will cause the whole transfer to fail.
973
974     """
975     self.success = self.success and success
976
977
978 def _GetInstDiskMagic(base, instance_name, index):
979   """Computes the magic value for a disk export or import.
980
981   @type base: string
982   @param base: Random seed value (can be the same for all disks of a transfer)
983   @type instance_name: string
984   @param instance_name: Name of instance
985   @type index: number
986   @param index: Disk index
987
988   """
989   h = compat.sha1_hash()
990   h.update(str(constants.RIE_VERSION))
991   h.update(base)
992   h.update(instance_name)
993   h.update(str(index))
994   return h.hexdigest()
995
996
997 def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
998                          instance, all_transfers):
999   """Transfers an instance's data from one node to another.
1000
1001   @param lu: Logical unit instance
1002   @param feedback_fn: Feedback function
1003   @type src_node: string
1004   @param src_node: Source node name
1005   @type dest_node: string
1006   @param dest_node: Destination node name
1007   @type dest_ip: string
1008   @param dest_ip: IP address of destination node
1009   @type instance: L{objects.Instance}
1010   @param instance: Instance object
1011   @type all_transfers: list of L{DiskTransfer} instances
1012   @param all_transfers: List of all disk transfers to be made
1013   @rtype: list
1014   @return: List with a boolean (True=successful, False=failed) for success for
1015            each transfer
1016
1017   """
1018   # Disable compression for all moves as these are all within the same cluster
1019   compress = constants.IEC_NONE
1020
1021   logging.debug("Source node %s, destination node %s, compression '%s'",
1022                 src_node, dest_node, compress)
1023
1024   timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1025   src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1026                                   src_node, None, dest_node, dest_ip)
1027   dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1028                                  src_node, src_cbs, dest_node, dest_ip)
1029
1030   all_dtp = []
1031
1032   base_magic = utils.GenerateSecret(6)
1033
1034   ieloop = ImportExportLoop(lu)
1035   try:
1036     for idx, transfer in enumerate(all_transfers):
1037       if transfer:
1038         feedback_fn("Exporting %s from %s to %s" %
1039                     (transfer.name, src_node, dest_node))
1040
1041         magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1042         opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1043                                            compress=compress, magic=magic)
1044
1045         dtp = _DiskTransferPrivate(transfer, True, opts)
1046
1047         di = DiskImport(lu, dest_node, opts, instance,
1048                         transfer.dest_io, transfer.dest_ioargs,
1049                         timeouts, dest_cbs, private=dtp)
1050         ieloop.Add(di)
1051
1052         dtp.dest_import = di
1053       else:
1054         dtp = _DiskTransferPrivate(None, False)
1055
1056       all_dtp.append(dtp)
1057
1058     ieloop.Run()
1059   finally:
1060     ieloop.FinalizeAll()
1061
1062   assert len(all_dtp) == len(all_transfers)
1063   assert compat.all([(dtp.src_export is None or
1064                       dtp.src_export.success is not None) and
1065                      (dtp.dest_import is None or
1066                       dtp.dest_import.success is not None)
1067                      for dtp in all_dtp]), \
1068          "Not all imports/exports are finalized"
1069
1070   return [bool(dtp.success) for dtp in all_dtp]
1071
1072
1073 class _RemoteExportCb(ImportExportCbBase):
1074   def __init__(self, feedback_fn, disk_count):
1075     """Initializes this class.
1076
1077     """
1078     ImportExportCbBase.__init__(self)
1079     self._feedback_fn = feedback_fn
1080     self._dresults = [None] * disk_count
1081
1082   @property
1083   def disk_results(self):
1084     """Returns per-disk results.
1085
1086     """
1087     return self._dresults
1088
1089   def ReportConnected(self, ie, private):
1090     """Called when a connection has been established.
1091
1092     """
1093     (idx, _) = private
1094
1095     self._feedback_fn("Disk %s is now sending data" % idx)
1096
1097   def ReportProgress(self, ie, private):
1098     """Called when new progress information should be reported.
1099
1100     """
1101     (idx, _) = private
1102
1103     progress = ie.progress
1104     if not progress:
1105       return
1106
1107     self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1108
1109   def ReportFinished(self, ie, private):
1110     """Called when a transfer has finished.
1111
1112     """
1113     (idx, finished_fn) = private
1114
1115     if ie.success:
1116       self._feedback_fn("Disk %s finished sending data" % idx)
1117     else:
1118       self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" %
1119                         (idx, ie.final_message, ie.recent_output))
1120
1121     self._dresults[idx] = bool(ie.success)
1122
1123     if finished_fn:
1124       finished_fn()
1125
1126
1127 class ExportInstanceHelper:
1128   def __init__(self, lu, feedback_fn, instance):
1129     """Initializes this class.
1130
1131     @param lu: Logical unit instance
1132     @param feedback_fn: Feedback function
1133     @type instance: L{objects.Instance}
1134     @param instance: Instance object
1135
1136     """
1137     self._lu = lu
1138     self._feedback_fn = feedback_fn
1139     self._instance = instance
1140
1141     self._snap_disks = []
1142     self._removed_snaps = [False] * len(instance.disks)
1143
1144   def CreateSnapshots(self):
1145     """Creates an LVM snapshot for every disk of the instance.
1146
1147     """
1148     assert not self._snap_disks
1149
1150     instance = self._instance
1151     src_node = instance.primary_node
1152
1153     vgname = self._lu.cfg.GetVGName()
1154
1155     for idx, disk in enumerate(instance.disks):
1156       self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1157                         (idx, src_node))
1158
1159       # result.payload will be a snapshot of an lvm leaf of the one we
1160       # passed
1161       result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1162       msg = result.fail_msg
1163       if msg:
1164         self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1165                             idx, src_node, msg)
1166         new_dev = False
1167       else:
1168         disk_id = (vgname, result.payload)
1169         new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1170                                logical_id=disk_id, physical_id=disk_id,
1171                                iv_name=disk.iv_name)
1172
1173       self._snap_disks.append(new_dev)
1174
1175     assert len(self._snap_disks) == len(instance.disks)
1176     assert len(self._removed_snaps) == len(instance.disks)
1177
1178   def _RemoveSnapshot(self, disk_index):
1179     """Removes an LVM snapshot.
1180
1181     @type disk_index: number
1182     @param disk_index: Index of the snapshot to be removed
1183
1184     """
1185     disk = self._snap_disks[disk_index]
1186     if disk and not self._removed_snaps[disk_index]:
1187       src_node = self._instance.primary_node
1188
1189       self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1190                         (disk_index, src_node))
1191
1192       result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1193       if result.fail_msg:
1194         self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1195                             " %s: %s", disk_index, src_node, result.fail_msg)
1196       else:
1197         self._removed_snaps[disk_index] = True
1198
1199   def LocalExport(self, dest_node):
1200     """Intra-cluster instance export.
1201
1202     @type dest_node: L{objects.Node}
1203     @param dest_node: Destination node
1204
1205     """
1206     instance = self._instance
1207     src_node = instance.primary_node
1208
1209     assert len(self._snap_disks) == len(instance.disks)
1210
1211     transfers = []
1212
1213     for idx, dev in enumerate(self._snap_disks):
1214       if not dev:
1215         transfers.append(None)
1216         continue
1217
1218       path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1219                             dev.physical_id[1])
1220
1221       finished_fn = compat.partial(self._TransferFinished, idx)
1222
1223       # FIXME: pass debug option from opcode to backend
1224       dt = DiskTransfer("snapshot/%s" % idx,
1225                         constants.IEIO_SCRIPT, (dev, idx),
1226                         constants.IEIO_FILE, (path, ),
1227                         finished_fn)
1228       transfers.append(dt)
1229
1230     # Actually export data
1231     dresults = TransferInstanceData(self._lu, self._feedback_fn,
1232                                     src_node, dest_node.name,
1233                                     dest_node.secondary_ip,
1234                                     instance, transfers)
1235
1236     assert len(dresults) == len(instance.disks)
1237
1238     self._feedback_fn("Finalizing export on %s" % dest_node.name)
1239     result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1240                                                self._snap_disks)
1241     msg = result.fail_msg
1242     fin_resu = not msg
1243     if msg:
1244       self._lu.LogWarning("Could not finalize export for instance %s"
1245                           " on node %s: %s", instance.name, dest_node.name, msg)
1246
1247     return (fin_resu, dresults)
1248
1249   def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1250     """Inter-cluster instance export.
1251
1252     @type disk_info: list
1253     @param disk_info: Per-disk destination information
1254     @type key_name: string
1255     @param key_name: Name of X509 key to use
1256     @type dest_ca_pem: string
1257     @param dest_ca_pem: Destination X509 CA in PEM format
1258     @type timeouts: L{ImportExportTimeouts}
1259     @param timeouts: Timeouts for this import
1260
1261     """
1262     instance = self._instance
1263
1264     assert len(disk_info) == len(instance.disks)
1265
1266     cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1267
1268     ieloop = ImportExportLoop(self._lu)
1269     try:
1270       for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1271                                                            disk_info)):
1272         opts = objects.ImportExportOptions(key_name=key_name,
1273                                            ca_pem=dest_ca_pem,
1274                                            magic=magic)
1275
1276         self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1277         finished_fn = compat.partial(self._TransferFinished, idx)
1278         ieloop.Add(DiskExport(self._lu, instance.primary_node,
1279                               opts, host, port, instance,
1280                               constants.IEIO_SCRIPT, (dev, idx),
1281                               timeouts, cbs, private=(idx, finished_fn)))
1282
1283       ieloop.Run()
1284     finally:
1285       ieloop.FinalizeAll()
1286
1287     return (True, cbs.disk_results)
1288
1289   def _TransferFinished(self, idx):
1290     """Called once a transfer has finished.
1291
1292     @type idx: number
1293     @param idx: Disk index
1294
1295     """
1296     logging.debug("Transfer %s finished", idx)
1297     self._RemoveSnapshot(idx)
1298
1299   def Cleanup(self):
1300     """Remove all snapshots.
1301
1302     """
1303     assert len(self._removed_snaps) == len(self._instance.disks)
1304     for idx in range(len(self._instance.disks)):
1305       self._RemoveSnapshot(idx)
1306
1307
1308 class _RemoteImportCb(ImportExportCbBase):
1309   def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1310                external_address):
1311     """Initializes this class.
1312
1313     @type cds: string
1314     @param cds: Cluster domain secret
1315     @type x509_cert_pem: string
1316     @param x509_cert_pem: CA used for signing import key
1317     @type disk_count: number
1318     @param disk_count: Number of disks
1319     @type external_address: string
1320     @param external_address: External address of destination node
1321
1322     """
1323     ImportExportCbBase.__init__(self)
1324     self._feedback_fn = feedback_fn
1325     self._cds = cds
1326     self._x509_cert_pem = x509_cert_pem
1327     self._disk_count = disk_count
1328     self._external_address = external_address
1329
1330     self._dresults = [None] * disk_count
1331     self._daemon_port = [None] * disk_count
1332
1333     self._salt = utils.GenerateSecret(8)
1334
1335   @property
1336   def disk_results(self):
1337     """Returns per-disk results.
1338
1339     """
1340     return self._dresults
1341
1342   def _CheckAllListening(self):
1343     """Checks whether all daemons are listening.
1344
1345     If all daemons are listening, the information is sent to the client.
1346
1347     """
1348     if not compat.all(dp is not None for dp in self._daemon_port):
1349       return
1350
1351     host = self._external_address
1352
1353     disks = []
1354     for idx, (port, magic) in enumerate(self._daemon_port):
1355       disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1356                                                idx, host, port, magic))
1357
1358     assert len(disks) == self._disk_count
1359
1360     self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1361       "disks": disks,
1362       "x509_ca": self._x509_cert_pem,
1363       })
1364
1365   def ReportListening(self, ie, private):
1366     """Called when daemon started listening.
1367
1368     """
1369     (idx, ) = private
1370
1371     self._feedback_fn("Disk %s is now listening" % idx)
1372
1373     assert self._daemon_port[idx] is None
1374
1375     self._daemon_port[idx] = (ie.listen_port, ie.magic)
1376
1377     self._CheckAllListening()
1378
1379   def ReportConnected(self, ie, private):
1380     """Called when a connection has been established.
1381
1382     """
1383     (idx, ) = private
1384
1385     self._feedback_fn("Disk %s is now receiving data" % idx)
1386
1387   def ReportFinished(self, ie, private):
1388     """Called when a transfer has finished.
1389
1390     """
1391     (idx, ) = private
1392
1393     # Daemon is certainly no longer listening
1394     self._daemon_port[idx] = None
1395
1396     if ie.success:
1397       self._feedback_fn("Disk %s finished receiving data" % idx)
1398     else:
1399       self._feedback_fn(("Disk %s failed to receive data: %s"
1400                          " (recent output: %r)") %
1401                         (idx, ie.final_message, ie.recent_output))
1402
1403     self._dresults[idx] = bool(ie.success)
1404
1405
1406 def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts):
1407   """Imports an instance from another cluster.
1408
1409   @param lu: Logical unit instance
1410   @param feedback_fn: Feedback function
1411   @type instance: L{objects.Instance}
1412   @param instance: Instance object
1413   @type source_x509_ca: OpenSSL.crypto.X509
1414   @param source_x509_ca: Import source's X509 CA
1415   @type cds: string
1416   @param cds: Cluster domain secret
1417   @type timeouts: L{ImportExportTimeouts}
1418   @param timeouts: Timeouts for this import
1419
1420   """
1421   source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1422                                                   source_x509_ca)
1423
1424   magic_base = utils.GenerateSecret(6)
1425
1426   # Create crypto key
1427   result = lu.rpc.call_x509_cert_create(instance.primary_node,
1428                                         constants.RIE_CERT_VALIDITY)
1429   result.Raise("Can't create X509 key and certificate on %s" % result.node)
1430
1431   (x509_key_name, x509_cert_pem) = result.payload
1432   try:
1433     # Load certificate
1434     x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1435                                                 x509_cert_pem)
1436
1437     # Sign certificate
1438     signed_x509_cert_pem = \
1439       utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1440
1441     cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1442                           len(instance.disks), instance.primary_node)
1443
1444     ieloop = ImportExportLoop(lu)
1445     try:
1446       for idx, dev in enumerate(instance.disks):
1447         magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1448
1449         # Import daemon options
1450         opts = objects.ImportExportOptions(key_name=x509_key_name,
1451                                            ca_pem=source_ca_pem,
1452                                            magic=magic)
1453
1454         ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1455                               constants.IEIO_SCRIPT, (dev, idx),
1456                               timeouts, cbs, private=(idx, )))
1457
1458       ieloop.Run()
1459     finally:
1460       ieloop.FinalizeAll()
1461   finally:
1462     # Remove crypto key and certificate
1463     result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1464     result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1465
1466   return cbs.disk_results
1467
1468
1469 def _GetImportExportHandshakeMessage(version):
1470   """Returns the handshake message for a RIE protocol version.
1471
1472   @type version: number
1473
1474   """
1475   return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1476
1477
1478 def ComputeRemoteExportHandshake(cds):
1479   """Computes the remote import/export handshake.
1480
1481   @type cds: string
1482   @param cds: Cluster domain secret
1483
1484   """
1485   salt = utils.GenerateSecret(8)
1486   msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1487   return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1488
1489
1490 def CheckRemoteExportHandshake(cds, handshake):
1491   """Checks the handshake of a remote import/export.
1492
1493   @type cds: string
1494   @param cds: Cluster domain secret
1495   @type handshake: sequence
1496   @param handshake: Handshake sent by remote peer
1497
1498   """
1499   try:
1500     (version, hmac_digest, hmac_salt) = handshake
1501   except (TypeError, ValueError), err:
1502     return "Invalid data: %s" % err
1503
1504   if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1505                               hmac_digest, salt=hmac_salt):
1506     return "Hash didn't match, clusters don't share the same domain secret"
1507
1508   if version != constants.RIE_VERSION:
1509     return ("Clusters don't have the same remote import/export protocol"
1510             " (local=%s, remote=%s)" %
1511             (constants.RIE_VERSION, version))
1512
1513   return None
1514
1515
1516 def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1517   """Returns the hashed text for import/export disk information.
1518
1519   @type disk_index: number
1520   @param disk_index: Index of disk (included in hash)
1521   @type host: string
1522   @param host: Hostname
1523   @type port: number
1524   @param port: Daemon port
1525   @type magic: string
1526   @param magic: Magic value
1527
1528   """
1529   return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1530
1531
1532 def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1533   """Verifies received disk information for an export.
1534
1535   @type cds: string
1536   @param cds: Cluster domain secret
1537   @type disk_index: number
1538   @param disk_index: Index of disk (included in hash)
1539   @type disk_info: sequence
1540   @param disk_info: Disk information sent by remote peer
1541
1542   """
1543   try:
1544     (host, port, magic, hmac_digest, hmac_salt) = disk_info
1545   except (TypeError, ValueError), err:
1546     raise errors.GenericError("Invalid data: %s" % err)
1547
1548   if not (host and port and magic):
1549     raise errors.GenericError("Missing destination host, port or magic")
1550
1551   msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1552
1553   if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1554     raise errors.GenericError("HMAC is wrong")
1555
1556   return (utils.HostInfo.NormalizeName(host),
1557           utils.ValidateServiceName(port),
1558           magic)
1559
1560
1561 def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1562   """Computes the signed disk information for a remote import.
1563
1564   @type cds: string
1565   @param cds: Cluster domain secret
1566   @type salt: string
1567   @param salt: HMAC salt
1568   @type disk_index: number
1569   @param disk_index: Index of disk (included in hash)
1570   @type host: string
1571   @param host: Hostname
1572   @type port: number
1573   @param port: Daemon port
1574   @type magic: string
1575   @param magic: Magic value
1576
1577   """
1578   msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1579   hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1580   return (host, port, magic, hmac_digest, salt)