Add basic support for disk parameters
[ganeti-local] / lib / masterd / instance.py
1 #
2 #
3
4 # Copyright (C) 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Instance-related functions and classes for masterd.
23
24 """
25
26 import logging
27 import time
28 import OpenSSL
29
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import compat
33 from ganeti import utils
34 from ganeti import objects
35 from ganeti import netutils
36
37
38 class _ImportExportError(Exception):
39   """Local exception to report import/export errors.
40
41   """
42
43
44 class ImportExportTimeouts(object):
45   #: Time until daemon starts writing status file
46   DEFAULT_READY_TIMEOUT = 10
47
48   #: Length of time until errors cause hard failure
49   DEFAULT_ERROR_TIMEOUT = 10
50
51   #: Time after which daemon must be listening
52   DEFAULT_LISTEN_TIMEOUT = 10
53
54   #: Progress update interval
55   DEFAULT_PROGRESS_INTERVAL = 60
56
57   __slots__ = [
58     "error",
59     "ready",
60     "listen",
61     "connect",
62     "progress",
63     ]
64
65   def __init__(self, connect,
66                listen=DEFAULT_LISTEN_TIMEOUT,
67                error=DEFAULT_ERROR_TIMEOUT,
68                ready=DEFAULT_READY_TIMEOUT,
69                progress=DEFAULT_PROGRESS_INTERVAL):
70     """Initializes this class.
71
72     @type connect: number
73     @param connect: Timeout for establishing connection
74     @type listen: number
75     @param listen: Timeout for starting to listen for connections
76     @type error: number
77     @param error: Length of time until errors cause hard failure
78     @type ready: number
79     @param ready: Timeout for daemon to become ready
80     @type progress: number
81     @param progress: Progress update interval
82
83     """
84     self.error = error
85     self.ready = ready
86     self.listen = listen
87     self.connect = connect
88     self.progress = progress
89
90
91 class ImportExportCbBase(object):
92   """Callbacks for disk import/export.
93
94   """
95   def ReportListening(self, ie, private, component):
96     """Called when daemon started listening.
97
98     @type ie: Subclass of L{_DiskImportExportBase}
99     @param ie: Import/export object
100     @param private: Private data passed to import/export object
101     @param component: transfer component name
102
103     """
104
105   def ReportConnected(self, ie, private):
106     """Called when a connection has been established.
107
108     @type ie: Subclass of L{_DiskImportExportBase}
109     @param ie: Import/export object
110     @param private: Private data passed to import/export object
111
112     """
113
114   def ReportProgress(self, ie, private):
115     """Called when new progress information should be reported.
116
117     @type ie: Subclass of L{_DiskImportExportBase}
118     @param ie: Import/export object
119     @param private: Private data passed to import/export object
120
121     """
122
123   def ReportFinished(self, ie, private):
124     """Called when a transfer has finished.
125
126     @type ie: Subclass of L{_DiskImportExportBase}
127     @param ie: Import/export object
128     @param private: Private data passed to import/export object
129
130     """
131
132
133 class _DiskImportExportBase(object):
134   MODE_TEXT = None
135
136   def __init__(self, lu, node_name, opts,
137                instance, component, timeouts, cbs, private=None):
138     """Initializes this class.
139
140     @param lu: Logical unit instance
141     @type node_name: string
142     @param node_name: Node name for import
143     @type opts: L{objects.ImportExportOptions}
144     @param opts: Import/export daemon options
145     @type instance: L{objects.Instance}
146     @param instance: Instance object
147     @type component: string
148     @param component: which part of the instance is being imported
149     @type timeouts: L{ImportExportTimeouts}
150     @param timeouts: Timeouts for this import
151     @type cbs: L{ImportExportCbBase}
152     @param cbs: Callbacks
153     @param private: Private data for callback functions
154
155     """
156     assert self.MODE_TEXT
157
158     self._lu = lu
159     self.node_name = node_name
160     self._opts = opts.Copy()
161     self._instance = instance
162     self._component = component
163     self._timeouts = timeouts
164     self._cbs = cbs
165     self._private = private
166
167     # Set master daemon's timeout in options for import/export daemon
168     assert self._opts.connect_timeout is None
169     self._opts.connect_timeout = timeouts.connect
170
171     # Parent loop
172     self._loop = None
173
174     # Timestamps
175     self._ts_begin = None
176     self._ts_connected = None
177     self._ts_finished = None
178     self._ts_cleanup = None
179     self._ts_last_progress = None
180     self._ts_last_error = None
181
182     # Transfer status
183     self.success = None
184     self.final_message = None
185
186     # Daemon status
187     self._daemon_name = None
188     self._daemon = None
189
190   @property
191   def recent_output(self):
192     """Returns the most recent output from the daemon.
193
194     """
195     if self._daemon:
196       return "\n".join(self._daemon.recent_output)
197
198     return None
199
200   @property
201   def progress(self):
202     """Returns transfer progress information.
203
204     """
205     if not self._daemon:
206       return None
207
208     return (self._daemon.progress_mbytes,
209             self._daemon.progress_throughput,
210             self._daemon.progress_percent,
211             self._daemon.progress_eta)
212
213   @property
214   def magic(self):
215     """Returns the magic value for this import/export.
216
217     """
218     return self._opts.magic
219
220   @property
221   def active(self):
222     """Determines whether this transport is still active.
223
224     """
225     return self.success is None
226
227   @property
228   def loop(self):
229     """Returns parent loop.
230
231     @rtype: L{ImportExportLoop}
232
233     """
234     return self._loop
235
236   def SetLoop(self, loop):
237     """Sets the parent loop.
238
239     @type loop: L{ImportExportLoop}
240
241     """
242     if self._loop:
243       raise errors.ProgrammerError("Loop can only be set once")
244
245     self._loop = loop
246
247   def _StartDaemon(self):
248     """Starts the import/export daemon.
249
250     """
251     raise NotImplementedError()
252
253   def CheckDaemon(self):
254     """Checks whether daemon has been started and if not, starts it.
255
256     @rtype: string
257     @return: Daemon name
258
259     """
260     assert self._ts_cleanup is None
261
262     if self._daemon_name is None:
263       assert self._ts_begin is None
264
265       result = self._StartDaemon()
266       if result.fail_msg:
267         raise _ImportExportError("Failed to start %s on %s: %s" %
268                                  (self.MODE_TEXT, self.node_name,
269                                   result.fail_msg))
270
271       daemon_name = result.payload
272
273       logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
274                    self.node_name)
275
276       self._ts_begin = time.time()
277       self._daemon_name = daemon_name
278
279     return self._daemon_name
280
281   def GetDaemonName(self):
282     """Returns the daemon name.
283
284     """
285     assert self._daemon_name, "Daemon has not been started"
286     assert self._ts_cleanup is None
287     return self._daemon_name
288
289   def Abort(self):
290     """Sends SIGTERM to import/export daemon (if still active).
291
292     """
293     if self._daemon_name:
294       self._lu.LogWarning("Aborting %s '%s' on %s",
295                           self.MODE_TEXT, self._daemon_name, self.node_name)
296       result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
297       if result.fail_msg:
298         self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
299                             self.MODE_TEXT, self._daemon_name,
300                             self.node_name, result.fail_msg)
301         return False
302
303     return True
304
305   def _SetDaemonData(self, data):
306     """Internal function for updating status daemon data.
307
308     @type data: L{objects.ImportExportStatus}
309     @param data: Daemon status data
310
311     """
312     assert self._ts_begin is not None
313
314     if not data:
315       if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready):
316         raise _ImportExportError("Didn't become ready after %s seconds" %
317                                  self._timeouts.ready)
318
319       return False
320
321     self._daemon = data
322
323     return True
324
325   def SetDaemonData(self, success, data):
326     """Updates daemon status data.
327
328     @type success: bool
329     @param success: Whether fetching data was successful or not
330     @type data: L{objects.ImportExportStatus}
331     @param data: Daemon status data
332
333     """
334     if not success:
335       if self._ts_last_error is None:
336         self._ts_last_error = time.time()
337
338       elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error):
339         raise _ImportExportError("Too many errors while updating data")
340
341       return False
342
343     self._ts_last_error = None
344
345     return self._SetDaemonData(data)
346
347   def CheckListening(self):
348     """Checks whether the daemon is listening.
349
350     """
351     raise NotImplementedError()
352
353   def _GetConnectedCheckEpoch(self):
354     """Returns timeout to calculate connect timeout.
355
356     """
357     raise NotImplementedError()
358
359   def CheckConnected(self):
360     """Checks whether the daemon is connected.
361
362     @rtype: bool
363     @return: Whether the daemon is connected
364
365     """
366     assert self._daemon, "Daemon status missing"
367
368     if self._ts_connected is not None:
369       return True
370
371     if self._daemon.connected:
372       self._ts_connected = time.time()
373
374       # TODO: Log remote peer
375       logging.debug("%s '%s' on %s is now connected",
376                     self.MODE_TEXT, self._daemon_name, self.node_name)
377
378       self._cbs.ReportConnected(self, self._private)
379
380       return True
381
382     if utils.TimeoutExpired(self._GetConnectedCheckEpoch(),
383                             self._timeouts.connect):
384       raise _ImportExportError("Not connected after %s seconds" %
385                                self._timeouts.connect)
386
387     return False
388
389   def _CheckProgress(self):
390     """Checks whether a progress update should be reported.
391
392     """
393     if ((self._ts_last_progress is None or
394         utils.TimeoutExpired(self._ts_last_progress,
395                              self._timeouts.progress)) and
396         self._daemon and
397         self._daemon.progress_mbytes is not None and
398         self._daemon.progress_throughput is not None):
399       self._cbs.ReportProgress(self, self._private)
400       self._ts_last_progress = time.time()
401
402   def CheckFinished(self):
403     """Checks whether the daemon exited.
404
405     @rtype: bool
406     @return: Whether the transfer is finished
407
408     """
409     assert self._daemon, "Daemon status missing"
410
411     if self._ts_finished:
412       return True
413
414     if self._daemon.exit_status is None:
415       # TODO: Adjust delay for ETA expiring soon
416       self._CheckProgress()
417       return False
418
419     self._ts_finished = time.time()
420
421     self._ReportFinished(self._daemon.exit_status == 0,
422                          self._daemon.error_message)
423
424     return True
425
426   def _ReportFinished(self, success, message):
427     """Transfer is finished or daemon exited.
428
429     @type success: bool
430     @param success: Whether the transfer was successful
431     @type message: string
432     @param message: Error message
433
434     """
435     assert self.success is None
436
437     self.success = success
438     self.final_message = message
439
440     if success:
441       logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
442                    self._daemon_name, self.node_name)
443     elif self._daemon_name:
444       self._lu.LogWarning("%s '%s' on %s failed: %s",
445                           self.MODE_TEXT, self._daemon_name, self.node_name,
446                           message)
447     else:
448       self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
449                           self.node_name, message)
450
451     self._cbs.ReportFinished(self, self._private)
452
453   def _Finalize(self):
454     """Makes the RPC call to finalize this import/export.
455
456     """
457     return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
458
459   def Finalize(self, error=None):
460     """Finalizes this import/export.
461
462     """
463     if self._daemon_name:
464       logging.info("Finalizing %s '%s' on %s",
465                    self.MODE_TEXT, self._daemon_name, self.node_name)
466
467       result = self._Finalize()
468       if result.fail_msg:
469         self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
470                             self.MODE_TEXT, self._daemon_name,
471                             self.node_name, result.fail_msg)
472         return False
473
474       # Daemon is no longer running
475       self._daemon_name = None
476       self._ts_cleanup = time.time()
477
478     if error:
479       self._ReportFinished(False, error)
480
481     return True
482
483
484 class DiskImport(_DiskImportExportBase):
485   MODE_TEXT = "import"
486
487   def __init__(self, lu, node_name, opts, instance, component,
488                dest, dest_args, timeouts, cbs, private=None):
489     """Initializes this class.
490
491     @param lu: Logical unit instance
492     @type node_name: string
493     @param node_name: Node name for import
494     @type opts: L{objects.ImportExportOptions}
495     @param opts: Import/export daemon options
496     @type instance: L{objects.Instance}
497     @param instance: Instance object
498     @type component: string
499     @param component: which part of the instance is being imported
500     @param dest: I/O destination
501     @param dest_args: I/O arguments
502     @type timeouts: L{ImportExportTimeouts}
503     @param timeouts: Timeouts for this import
504     @type cbs: L{ImportExportCbBase}
505     @param cbs: Callbacks
506     @param private: Private data for callback functions
507
508     """
509     _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
510                                    component, timeouts, cbs, private)
511     self._dest = dest
512     self._dest_args = dest_args
513
514     # Timestamps
515     self._ts_listening = None
516
517   @property
518   def listen_port(self):
519     """Returns the port the daemon is listening on.
520
521     """
522     if self._daemon:
523       return self._daemon.listen_port
524
525     return None
526
527   def _StartDaemon(self):
528     """Starts the import daemon.
529
530     """
531     return self._lu.rpc.call_import_start(self.node_name, self._opts,
532                                           self._instance, self._component,
533                                           (self._dest, self._dest_args))
534
535   def CheckListening(self):
536     """Checks whether the daemon is listening.
537
538     @rtype: bool
539     @return: Whether the daemon is listening
540
541     """
542     assert self._daemon, "Daemon status missing"
543
544     if self._ts_listening is not None:
545       return True
546
547     port = self._daemon.listen_port
548     if port is not None:
549       self._ts_listening = time.time()
550
551       logging.debug("Import '%s' on %s is now listening on port %s",
552                     self._daemon_name, self.node_name, port)
553
554       self._cbs.ReportListening(self, self._private, self._component)
555
556       return True
557
558     if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen):
559       raise _ImportExportError("Not listening after %s seconds" %
560                                self._timeouts.listen)
561
562     return False
563
564   def _GetConnectedCheckEpoch(self):
565     """Returns the time since we started listening.
566
567     """
568     assert self._ts_listening is not None, \
569            ("Checking whether an import is connected is only useful"
570             " once it's been listening")
571
572     return self._ts_listening
573
574
575 class DiskExport(_DiskImportExportBase):
576   MODE_TEXT = "export"
577
578   def __init__(self, lu, node_name, opts, dest_host, dest_port,
579                instance, component, source, source_args,
580                timeouts, cbs, private=None):
581     """Initializes this class.
582
583     @param lu: Logical unit instance
584     @type node_name: string
585     @param node_name: Node name for import
586     @type opts: L{objects.ImportExportOptions}
587     @param opts: Import/export daemon options
588     @type dest_host: string
589     @param dest_host: Destination host name or IP address
590     @type dest_port: number
591     @param dest_port: Destination port number
592     @type instance: L{objects.Instance}
593     @param instance: Instance object
594     @type component: string
595     @param component: which part of the instance is being imported
596     @param source: I/O source
597     @param source_args: I/O source
598     @type timeouts: L{ImportExportTimeouts}
599     @param timeouts: Timeouts for this import
600     @type cbs: L{ImportExportCbBase}
601     @param cbs: Callbacks
602     @param private: Private data for callback functions
603
604     """
605     _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
606                                    component, timeouts, cbs, private)
607     self._dest_host = dest_host
608     self._dest_port = dest_port
609     self._source = source
610     self._source_args = source_args
611
612   def _StartDaemon(self):
613     """Starts the export daemon.
614
615     """
616     return self._lu.rpc.call_export_start(self.node_name, self._opts,
617                                           self._dest_host, self._dest_port,
618                                           self._instance, self._component,
619                                           (self._source, self._source_args))
620
621   def CheckListening(self):
622     """Checks whether the daemon is listening.
623
624     """
625     # Only an import can be listening
626     return True
627
628   def _GetConnectedCheckEpoch(self):
629     """Returns the time since the daemon started.
630
631     """
632     assert self._ts_begin is not None
633
634     return self._ts_begin
635
636
637 def FormatProgress(progress):
638   """Formats progress information for user consumption
639
640   """
641   (mbytes, throughput, percent, eta) = progress
642
643   parts = [
644     utils.FormatUnit(mbytes, "h"),
645
646     # Not using FormatUnit as it doesn't support kilobytes
647     "%0.1f MiB/s" % throughput,
648     ]
649
650   if percent is not None:
651     parts.append("%d%%" % percent)
652
653   if eta is not None:
654     parts.append("ETA %s" % utils.FormatSeconds(eta))
655
656   return utils.CommaJoin(parts)
657
658
659 class ImportExportLoop:
660   MIN_DELAY = 1.0
661   MAX_DELAY = 20.0
662
663   def __init__(self, lu):
664     """Initializes this class.
665
666     """
667     self._lu = lu
668     self._queue = []
669     self._pending_add = []
670
671   def Add(self, diskie):
672     """Adds an import/export object to the loop.
673
674     @type diskie: Subclass of L{_DiskImportExportBase}
675     @param diskie: Import/export object
676
677     """
678     assert diskie not in self._pending_add
679     assert diskie.loop is None
680
681     diskie.SetLoop(self)
682
683     # Adding new objects to a staging list is necessary, otherwise the main
684     # loop gets confused if callbacks modify the queue while the main loop is
685     # iterating over it.
686     self._pending_add.append(diskie)
687
688   @staticmethod
689   def _CollectDaemonStatus(lu, daemons):
690     """Collects the status for all import/export daemons.
691
692     """
693     daemon_status = {}
694
695     for node_name, names in daemons.iteritems():
696       result = lu.rpc.call_impexp_status(node_name, names)
697       if result.fail_msg:
698         lu.LogWarning("Failed to get daemon status on %s: %s",
699                       node_name, result.fail_msg)
700         continue
701
702       assert len(names) == len(result.payload)
703
704       daemon_status[node_name] = dict(zip(names, result.payload))
705
706     return daemon_status
707
708   @staticmethod
709   def _GetActiveDaemonNames(queue):
710     """Gets the names of all active daemons.
711
712     """
713     result = {}
714     for diskie in queue:
715       if not diskie.active:
716         continue
717
718       try:
719         # Start daemon if necessary
720         daemon_name = diskie.CheckDaemon()
721       except _ImportExportError, err:
722         logging.exception("%s failed", diskie.MODE_TEXT)
723         diskie.Finalize(error=str(err))
724         continue
725
726       result.setdefault(diskie.node_name, []).append(daemon_name)
727
728     assert len(queue) >= len(result)
729     assert len(queue) >= sum([len(names) for names in result.itervalues()])
730
731     logging.debug("daemons=%r", result)
732
733     return result
734
735   def _AddPendingToQueue(self):
736     """Adds all pending import/export objects to the internal queue.
737
738     """
739     assert compat.all(diskie not in self._queue and diskie.loop == self
740                       for diskie in self._pending_add)
741
742     self._queue.extend(self._pending_add)
743
744     del self._pending_add[:]
745
746   def Run(self):
747     """Utility main loop.
748
749     """
750     while True:
751       self._AddPendingToQueue()
752
753       # Collect all active daemon names
754       daemons = self._GetActiveDaemonNames(self._queue)
755       if not daemons:
756         break
757
758       # Collection daemon status data
759       data = self._CollectDaemonStatus(self._lu, daemons)
760
761       # Use data
762       delay = self.MAX_DELAY
763       for diskie in self._queue:
764         if not diskie.active:
765           continue
766
767         try:
768           try:
769             all_daemon_data = data[diskie.node_name]
770           except KeyError:
771             result = diskie.SetDaemonData(False, None)
772           else:
773             result = \
774               diskie.SetDaemonData(True,
775                                    all_daemon_data[diskie.GetDaemonName()])
776
777           if not result:
778             # Daemon not yet ready, retry soon
779             delay = min(3.0, delay)
780             continue
781
782           if diskie.CheckFinished():
783             # Transfer finished
784             diskie.Finalize()
785             continue
786
787           # Normal case: check again in 5 seconds
788           delay = min(5.0, delay)
789
790           if not diskie.CheckListening():
791             # Not yet listening, retry soon
792             delay = min(1.0, delay)
793             continue
794
795           if not diskie.CheckConnected():
796             # Not yet connected, retry soon
797             delay = min(1.0, delay)
798             continue
799
800         except _ImportExportError, err:
801           logging.exception("%s failed", diskie.MODE_TEXT)
802           diskie.Finalize(error=str(err))
803
804       if not compat.any(diskie.active for diskie in self._queue):
805         break
806
807       # Wait a bit
808       delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
809       logging.debug("Waiting for %ss", delay)
810       time.sleep(delay)
811
812   def FinalizeAll(self):
813     """Finalizes all pending transfers.
814
815     """
816     success = True
817
818     for diskie in self._queue:
819       success = diskie.Finalize() and success
820
821     return success
822
823
824 class _TransferInstCbBase(ImportExportCbBase):
825   def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
826                dest_node, dest_ip):
827     """Initializes this class.
828
829     """
830     ImportExportCbBase.__init__(self)
831
832     self.lu = lu
833     self.feedback_fn = feedback_fn
834     self.instance = instance
835     self.timeouts = timeouts
836     self.src_node = src_node
837     self.src_cbs = src_cbs
838     self.dest_node = dest_node
839     self.dest_ip = dest_ip
840
841
842 class _TransferInstSourceCb(_TransferInstCbBase):
843   def ReportConnected(self, ie, dtp):
844     """Called when a connection has been established.
845
846     """
847     assert self.src_cbs is None
848     assert dtp.src_export == ie
849     assert dtp.dest_import
850
851     self.feedback_fn("%s is sending data on %s" %
852                      (dtp.data.name, ie.node_name))
853
854   def ReportProgress(self, ie, dtp):
855     """Called when new progress information should be reported.
856
857     """
858     progress = ie.progress
859     if not progress:
860       return
861
862     self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
863
864   def ReportFinished(self, ie, dtp):
865     """Called when a transfer has finished.
866
867     """
868     assert self.src_cbs is None
869     assert dtp.src_export == ie
870     assert dtp.dest_import
871
872     if ie.success:
873       self.feedback_fn("%s finished sending data" % dtp.data.name)
874     else:
875       self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
876                        (dtp.data.name, ie.final_message, ie.recent_output))
877
878     dtp.RecordResult(ie.success)
879
880     cb = dtp.data.finished_fn
881     if cb:
882       cb()
883
884     # TODO: Check whether sending SIGTERM right away is okay, maybe we should
885     # give the daemon a moment to sort things out
886     if dtp.dest_import and not ie.success:
887       dtp.dest_import.Abort()
888
889
890 class _TransferInstDestCb(_TransferInstCbBase):
891   def ReportListening(self, ie, dtp, component):
892     """Called when daemon started listening.
893
894     """
895     assert self.src_cbs
896     assert dtp.src_export is None
897     assert dtp.dest_import
898     assert dtp.export_opts
899
900     self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
901
902     # Start export on source node
903     de = DiskExport(self.lu, self.src_node, dtp.export_opts,
904                     self.dest_ip, ie.listen_port, self.instance,
905                     component, dtp.data.src_io, dtp.data.src_ioargs,
906                     self.timeouts, self.src_cbs, private=dtp)
907     ie.loop.Add(de)
908
909     dtp.src_export = de
910
911   def ReportConnected(self, ie, dtp):
912     """Called when a connection has been established.
913
914     """
915     self.feedback_fn("%s is receiving data on %s" %
916                      (dtp.data.name, self.dest_node))
917
918   def ReportFinished(self, ie, dtp):
919     """Called when a transfer has finished.
920
921     """
922     if ie.success:
923       self.feedback_fn("%s finished receiving data" % dtp.data.name)
924     else:
925       self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
926                        (dtp.data.name, ie.final_message, ie.recent_output))
927
928     dtp.RecordResult(ie.success)
929
930     # TODO: Check whether sending SIGTERM right away is okay, maybe we should
931     # give the daemon a moment to sort things out
932     if dtp.src_export and not ie.success:
933       dtp.src_export.Abort()
934
935
936 class DiskTransfer(object):
937   def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
938                finished_fn):
939     """Initializes this class.
940
941     @type name: string
942     @param name: User-visible name for this transfer (e.g. "disk/0")
943     @param src_io: Source I/O type
944     @param src_ioargs: Source I/O arguments
945     @param dest_io: Destination I/O type
946     @param dest_ioargs: Destination I/O arguments
947     @type finished_fn: callable
948     @param finished_fn: Function called once transfer has finished
949
950     """
951     self.name = name
952
953     self.src_io = src_io
954     self.src_ioargs = src_ioargs
955
956     self.dest_io = dest_io
957     self.dest_ioargs = dest_ioargs
958
959     self.finished_fn = finished_fn
960
961
962 class _DiskTransferPrivate(object):
963   def __init__(self, data, success, export_opts):
964     """Initializes this class.
965
966     @type data: L{DiskTransfer}
967     @type success: bool
968
969     """
970     self.data = data
971     self.success = success
972     self.export_opts = export_opts
973
974     self.src_export = None
975     self.dest_import = None
976
977   def RecordResult(self, success):
978     """Updates the status.
979
980     One failed part will cause the whole transfer to fail.
981
982     """
983     self.success = self.success and success
984
985
986 def _GetInstDiskMagic(base, instance_name, index):
987   """Computes the magic value for a disk export or import.
988
989   @type base: string
990   @param base: Random seed value (can be the same for all disks of a transfer)
991   @type instance_name: string
992   @param instance_name: Name of instance
993   @type index: number
994   @param index: Disk index
995
996   """
997   h = compat.sha1_hash()
998   h.update(str(constants.RIE_VERSION))
999   h.update(base)
1000   h.update(instance_name)
1001   h.update(str(index))
1002   return h.hexdigest()
1003
1004
1005 def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
1006                          instance, all_transfers):
1007   """Transfers an instance's data from one node to another.
1008
1009   @param lu: Logical unit instance
1010   @param feedback_fn: Feedback function
1011   @type src_node: string
1012   @param src_node: Source node name
1013   @type dest_node: string
1014   @param dest_node: Destination node name
1015   @type dest_ip: string
1016   @param dest_ip: IP address of destination node
1017   @type instance: L{objects.Instance}
1018   @param instance: Instance object
1019   @type all_transfers: list of L{DiskTransfer} instances
1020   @param all_transfers: List of all disk transfers to be made
1021   @rtype: list
1022   @return: List with a boolean (True=successful, False=failed) for success for
1023            each transfer
1024
1025   """
1026   # Disable compression for all moves as these are all within the same cluster
1027   compress = constants.IEC_NONE
1028
1029   logging.debug("Source node %s, destination node %s, compression '%s'",
1030                 src_node, dest_node, compress)
1031
1032   timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1033   src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1034                                   src_node, None, dest_node, dest_ip)
1035   dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1036                                  src_node, src_cbs, dest_node, dest_ip)
1037
1038   all_dtp = []
1039
1040   base_magic = utils.GenerateSecret(6)
1041
1042   ieloop = ImportExportLoop(lu)
1043   try:
1044     for idx, transfer in enumerate(all_transfers):
1045       if transfer:
1046         feedback_fn("Exporting %s from %s to %s" %
1047                     (transfer.name, src_node, dest_node))
1048
1049         magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1050         opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1051                                            compress=compress, magic=magic)
1052
1053         dtp = _DiskTransferPrivate(transfer, True, opts)
1054
1055         di = DiskImport(lu, dest_node, opts, instance, "disk%d" % idx,
1056                         transfer.dest_io, transfer.dest_ioargs,
1057                         timeouts, dest_cbs, private=dtp)
1058         ieloop.Add(di)
1059
1060         dtp.dest_import = di
1061       else:
1062         dtp = _DiskTransferPrivate(None, False, None)
1063
1064       all_dtp.append(dtp)
1065
1066     ieloop.Run()
1067   finally:
1068     ieloop.FinalizeAll()
1069
1070   assert len(all_dtp) == len(all_transfers)
1071   assert compat.all((dtp.src_export is None or
1072                       dtp.src_export.success is not None) and
1073                      (dtp.dest_import is None or
1074                       dtp.dest_import.success is not None)
1075                      for dtp in all_dtp), \
1076          "Not all imports/exports are finalized"
1077
1078   return [bool(dtp.success) for dtp in all_dtp]
1079
1080
1081 class _RemoteExportCb(ImportExportCbBase):
1082   def __init__(self, feedback_fn, disk_count):
1083     """Initializes this class.
1084
1085     """
1086     ImportExportCbBase.__init__(self)
1087     self._feedback_fn = feedback_fn
1088     self._dresults = [None] * disk_count
1089
1090   @property
1091   def disk_results(self):
1092     """Returns per-disk results.
1093
1094     """
1095     return self._dresults
1096
1097   def ReportConnected(self, ie, private):
1098     """Called when a connection has been established.
1099
1100     """
1101     (idx, _) = private
1102
1103     self._feedback_fn("Disk %s is now sending data" % idx)
1104
1105   def ReportProgress(self, ie, private):
1106     """Called when new progress information should be reported.
1107
1108     """
1109     (idx, _) = private
1110
1111     progress = ie.progress
1112     if not progress:
1113       return
1114
1115     self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1116
1117   def ReportFinished(self, ie, private):
1118     """Called when a transfer has finished.
1119
1120     """
1121     (idx, finished_fn) = private
1122
1123     if ie.success:
1124       self._feedback_fn("Disk %s finished sending data" % idx)
1125     else:
1126       self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1127                         (idx, ie.final_message, ie.recent_output))
1128
1129     self._dresults[idx] = bool(ie.success)
1130
1131     if finished_fn:
1132       finished_fn()
1133
1134
1135 class ExportInstanceHelper:
1136   def __init__(self, lu, feedback_fn, instance):
1137     """Initializes this class.
1138
1139     @param lu: Logical unit instance
1140     @param feedback_fn: Feedback function
1141     @type instance: L{objects.Instance}
1142     @param instance: Instance object
1143
1144     """
1145     self._lu = lu
1146     self._feedback_fn = feedback_fn
1147     self._instance = instance
1148
1149     self._snap_disks = []
1150     self._removed_snaps = [False] * len(instance.disks)
1151
1152   def CreateSnapshots(self):
1153     """Creates an LVM snapshot for every disk of the instance.
1154
1155     """
1156     assert not self._snap_disks
1157
1158     instance = self._instance
1159     src_node = instance.primary_node
1160
1161     for idx, disk in enumerate(instance.disks):
1162       self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1163                         (idx, src_node))
1164
1165       # result.payload will be a snapshot of an lvm leaf of the one we
1166       # passed
1167       result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1168       new_dev = False
1169       msg = result.fail_msg
1170       if msg:
1171         self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1172                             idx, src_node, msg)
1173       elif (not isinstance(result.payload, (tuple, list)) or
1174             len(result.payload) != 2):
1175         self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1176                             " result '%s'", idx, src_node, result.payload)
1177       else:
1178         disk_id = tuple(result.payload)
1179         disk_params = constants.DISK_LD_DEFAULTS[constants.LD_LV].copy()
1180         new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1181                                logical_id=disk_id, physical_id=disk_id,
1182                                iv_name=disk.iv_name,
1183                                params=disk_params)
1184
1185       self._snap_disks.append(new_dev)
1186
1187     assert len(self._snap_disks) == len(instance.disks)
1188     assert len(self._removed_snaps) == len(instance.disks)
1189
1190   def _RemoveSnapshot(self, disk_index):
1191     """Removes an LVM snapshot.
1192
1193     @type disk_index: number
1194     @param disk_index: Index of the snapshot to be removed
1195
1196     """
1197     disk = self._snap_disks[disk_index]
1198     if disk and not self._removed_snaps[disk_index]:
1199       src_node = self._instance.primary_node
1200
1201       self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1202                         (disk_index, src_node))
1203
1204       result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1205       if result.fail_msg:
1206         self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1207                             " %s: %s", disk_index, src_node, result.fail_msg)
1208       else:
1209         self._removed_snaps[disk_index] = True
1210
1211   def LocalExport(self, dest_node):
1212     """Intra-cluster instance export.
1213
1214     @type dest_node: L{objects.Node}
1215     @param dest_node: Destination node
1216
1217     """
1218     instance = self._instance
1219     src_node = instance.primary_node
1220
1221     assert len(self._snap_disks) == len(instance.disks)
1222
1223     transfers = []
1224
1225     for idx, dev in enumerate(self._snap_disks):
1226       if not dev:
1227         transfers.append(None)
1228         continue
1229
1230       path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1231                             dev.physical_id[1])
1232
1233       finished_fn = compat.partial(self._TransferFinished, idx)
1234
1235       # FIXME: pass debug option from opcode to backend
1236       dt = DiskTransfer("snapshot/%s" % idx,
1237                         constants.IEIO_SCRIPT, (dev, idx),
1238                         constants.IEIO_FILE, (path, ),
1239                         finished_fn)
1240       transfers.append(dt)
1241
1242     # Actually export data
1243     dresults = TransferInstanceData(self._lu, self._feedback_fn,
1244                                     src_node, dest_node.name,
1245                                     dest_node.secondary_ip,
1246                                     instance, transfers)
1247
1248     assert len(dresults) == len(instance.disks)
1249
1250     self._feedback_fn("Finalizing export on %s" % dest_node.name)
1251     result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1252                                                self._snap_disks)
1253     msg = result.fail_msg
1254     fin_resu = not msg
1255     if msg:
1256       self._lu.LogWarning("Could not finalize export for instance %s"
1257                           " on node %s: %s", instance.name, dest_node.name, msg)
1258
1259     return (fin_resu, dresults)
1260
1261   def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1262     """Inter-cluster instance export.
1263
1264     @type disk_info: list
1265     @param disk_info: Per-disk destination information
1266     @type key_name: string
1267     @param key_name: Name of X509 key to use
1268     @type dest_ca_pem: string
1269     @param dest_ca_pem: Destination X509 CA in PEM format
1270     @type timeouts: L{ImportExportTimeouts}
1271     @param timeouts: Timeouts for this import
1272
1273     """
1274     instance = self._instance
1275
1276     assert len(disk_info) == len(instance.disks)
1277
1278     cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1279
1280     ieloop = ImportExportLoop(self._lu)
1281     try:
1282       for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1283                                                            disk_info)):
1284         # Decide whether to use IPv6
1285         ipv6 = netutils.IP6Address.IsValid(host)
1286
1287         opts = objects.ImportExportOptions(key_name=key_name,
1288                                            ca_pem=dest_ca_pem,
1289                                            magic=magic, ipv6=ipv6)
1290
1291         self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1292         finished_fn = compat.partial(self._TransferFinished, idx)
1293         ieloop.Add(DiskExport(self._lu, instance.primary_node,
1294                               opts, host, port, instance, "disk%d" % idx,
1295                               constants.IEIO_SCRIPT, (dev, idx),
1296                               timeouts, cbs, private=(idx, finished_fn)))
1297
1298       ieloop.Run()
1299     finally:
1300       ieloop.FinalizeAll()
1301
1302     return (True, cbs.disk_results)
1303
1304   def _TransferFinished(self, idx):
1305     """Called once a transfer has finished.
1306
1307     @type idx: number
1308     @param idx: Disk index
1309
1310     """
1311     logging.debug("Transfer %s finished", idx)
1312     self._RemoveSnapshot(idx)
1313
1314   def Cleanup(self):
1315     """Remove all snapshots.
1316
1317     """
1318     assert len(self._removed_snaps) == len(self._instance.disks)
1319     for idx in range(len(self._instance.disks)):
1320       self._RemoveSnapshot(idx)
1321
1322
1323 class _RemoteImportCb(ImportExportCbBase):
1324   def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1325                external_address):
1326     """Initializes this class.
1327
1328     @type cds: string
1329     @param cds: Cluster domain secret
1330     @type x509_cert_pem: string
1331     @param x509_cert_pem: CA used for signing import key
1332     @type disk_count: number
1333     @param disk_count: Number of disks
1334     @type external_address: string
1335     @param external_address: External address of destination node
1336
1337     """
1338     ImportExportCbBase.__init__(self)
1339     self._feedback_fn = feedback_fn
1340     self._cds = cds
1341     self._x509_cert_pem = x509_cert_pem
1342     self._disk_count = disk_count
1343     self._external_address = external_address
1344
1345     self._dresults = [None] * disk_count
1346     self._daemon_port = [None] * disk_count
1347
1348     self._salt = utils.GenerateSecret(8)
1349
1350   @property
1351   def disk_results(self):
1352     """Returns per-disk results.
1353
1354     """
1355     return self._dresults
1356
1357   def _CheckAllListening(self):
1358     """Checks whether all daemons are listening.
1359
1360     If all daemons are listening, the information is sent to the client.
1361
1362     """
1363     if not compat.all(dp is not None for dp in self._daemon_port):
1364       return
1365
1366     host = self._external_address
1367
1368     disks = []
1369     for idx, (port, magic) in enumerate(self._daemon_port):
1370       disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1371                                                idx, host, port, magic))
1372
1373     assert len(disks) == self._disk_count
1374
1375     self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1376       "disks": disks,
1377       "x509_ca": self._x509_cert_pem,
1378       })
1379
1380   def ReportListening(self, ie, private, _):
1381     """Called when daemon started listening.
1382
1383     """
1384     (idx, ) = private
1385
1386     self._feedback_fn("Disk %s is now listening" % idx)
1387
1388     assert self._daemon_port[idx] is None
1389
1390     self._daemon_port[idx] = (ie.listen_port, ie.magic)
1391
1392     self._CheckAllListening()
1393
1394   def ReportConnected(self, ie, private):
1395     """Called when a connection has been established.
1396
1397     """
1398     (idx, ) = private
1399
1400     self._feedback_fn("Disk %s is now receiving data" % idx)
1401
1402   def ReportFinished(self, ie, private):
1403     """Called when a transfer has finished.
1404
1405     """
1406     (idx, ) = private
1407
1408     # Daemon is certainly no longer listening
1409     self._daemon_port[idx] = None
1410
1411     if ie.success:
1412       self._feedback_fn("Disk %s finished receiving data" % idx)
1413     else:
1414       self._feedback_fn(("Disk %s failed to receive data: %s"
1415                          " (recent output: %s)") %
1416                         (idx, ie.final_message, ie.recent_output))
1417
1418     self._dresults[idx] = bool(ie.success)
1419
1420
1421 def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1422                  cds, timeouts):
1423   """Imports an instance from another cluster.
1424
1425   @param lu: Logical unit instance
1426   @param feedback_fn: Feedback function
1427   @type instance: L{objects.Instance}
1428   @param instance: Instance object
1429   @type pnode: L{objects.Node}
1430   @param pnode: Primary node of instance as an object
1431   @type source_x509_ca: OpenSSL.crypto.X509
1432   @param source_x509_ca: Import source's X509 CA
1433   @type cds: string
1434   @param cds: Cluster domain secret
1435   @type timeouts: L{ImportExportTimeouts}
1436   @param timeouts: Timeouts for this import
1437
1438   """
1439   source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1440                                                   source_x509_ca)
1441
1442   magic_base = utils.GenerateSecret(6)
1443
1444   # Decide whether to use IPv6
1445   ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1446
1447   # Create crypto key
1448   result = lu.rpc.call_x509_cert_create(instance.primary_node,
1449                                         constants.RIE_CERT_VALIDITY)
1450   result.Raise("Can't create X509 key and certificate on %s" % result.node)
1451
1452   (x509_key_name, x509_cert_pem) = result.payload
1453   try:
1454     # Load certificate
1455     x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1456                                                 x509_cert_pem)
1457
1458     # Sign certificate
1459     signed_x509_cert_pem = \
1460       utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1461
1462     cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1463                           len(instance.disks), pnode.primary_ip)
1464
1465     ieloop = ImportExportLoop(lu)
1466     try:
1467       for idx, dev in enumerate(instance.disks):
1468         magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1469
1470         # Import daemon options
1471         opts = objects.ImportExportOptions(key_name=x509_key_name,
1472                                            ca_pem=source_ca_pem,
1473                                            magic=magic, ipv6=ipv6)
1474
1475         ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1476                               "disk%d" % idx,
1477                               constants.IEIO_SCRIPT, (dev, idx),
1478                               timeouts, cbs, private=(idx, )))
1479
1480       ieloop.Run()
1481     finally:
1482       ieloop.FinalizeAll()
1483   finally:
1484     # Remove crypto key and certificate
1485     result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1486     result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1487
1488   return cbs.disk_results
1489
1490
1491 def _GetImportExportHandshakeMessage(version):
1492   """Returns the handshake message for a RIE protocol version.
1493
1494   @type version: number
1495
1496   """
1497   return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1498
1499
1500 def ComputeRemoteExportHandshake(cds):
1501   """Computes the remote import/export handshake.
1502
1503   @type cds: string
1504   @param cds: Cluster domain secret
1505
1506   """
1507   salt = utils.GenerateSecret(8)
1508   msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1509   return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1510
1511
1512 def CheckRemoteExportHandshake(cds, handshake):
1513   """Checks the handshake of a remote import/export.
1514
1515   @type cds: string
1516   @param cds: Cluster domain secret
1517   @type handshake: sequence
1518   @param handshake: Handshake sent by remote peer
1519
1520   """
1521   try:
1522     (version, hmac_digest, hmac_salt) = handshake
1523   except (TypeError, ValueError), err:
1524     return "Invalid data: %s" % err
1525
1526   if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1527                               hmac_digest, salt=hmac_salt):
1528     return "Hash didn't match, clusters don't share the same domain secret"
1529
1530   if version != constants.RIE_VERSION:
1531     return ("Clusters don't have the same remote import/export protocol"
1532             " (local=%s, remote=%s)" %
1533             (constants.RIE_VERSION, version))
1534
1535   return None
1536
1537
1538 def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1539   """Returns the hashed text for import/export disk information.
1540
1541   @type disk_index: number
1542   @param disk_index: Index of disk (included in hash)
1543   @type host: string
1544   @param host: Hostname
1545   @type port: number
1546   @param port: Daemon port
1547   @type magic: string
1548   @param magic: Magic value
1549
1550   """
1551   return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1552
1553
1554 def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1555   """Verifies received disk information for an export.
1556
1557   @type cds: string
1558   @param cds: Cluster domain secret
1559   @type disk_index: number
1560   @param disk_index: Index of disk (included in hash)
1561   @type disk_info: sequence
1562   @param disk_info: Disk information sent by remote peer
1563
1564   """
1565   try:
1566     (host, port, magic, hmac_digest, hmac_salt) = disk_info
1567   except (TypeError, ValueError), err:
1568     raise errors.GenericError("Invalid data: %s" % err)
1569
1570   if not (host and port and magic):
1571     raise errors.GenericError("Missing destination host, port or magic")
1572
1573   msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1574
1575   if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1576     raise errors.GenericError("HMAC is wrong")
1577
1578   if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1579     destination = host
1580   else:
1581     destination = netutils.Hostname.GetNormalizedName(host)
1582
1583   return (destination,
1584           utils.ValidateServiceName(port),
1585           magic)
1586
1587
1588 def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1589   """Computes the signed disk information for a remote import.
1590
1591   @type cds: string
1592   @param cds: Cluster domain secret
1593   @type salt: string
1594   @param salt: HMAC salt
1595   @type disk_index: number
1596   @param disk_index: Index of disk (included in hash)
1597   @type host: string
1598   @param host: Hostname
1599   @type port: number
1600   @param port: Daemon port
1601   @type magic: string
1602   @param magic: Magic value
1603
1604   """
1605   msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1606   hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1607   return (host, port, magic, hmac_digest, salt)