83e742411607239c56a22776589d8eddcd305ea3
[ganeti-local] / lib / masterd / instance.py
1 #
2 #
3
4 # Copyright (C) 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Instance-related functions and classes for masterd.
23
24 """
25
26 import logging
27 import time
28 import OpenSSL
29
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import compat
33 from ganeti import utils
34 from ganeti import objects
35 from ganeti import netutils
36 from ganeti import pathutils
37
38
39 class _ImportExportError(Exception):
40   """Local exception to report import/export errors.
41
42   """
43
44
45 class ImportExportTimeouts(object):
46   #: Time until daemon starts writing status file
47   DEFAULT_READY_TIMEOUT = 10
48
49   #: Length of time until errors cause hard failure
50   DEFAULT_ERROR_TIMEOUT = 10
51
52   #: Time after which daemon must be listening
53   DEFAULT_LISTEN_TIMEOUT = 10
54
55   #: Progress update interval
56   DEFAULT_PROGRESS_INTERVAL = 60
57
58   __slots__ = [
59     "error",
60     "ready",
61     "listen",
62     "connect",
63     "progress",
64     ]
65
66   def __init__(self, connect,
67                listen=DEFAULT_LISTEN_TIMEOUT,
68                error=DEFAULT_ERROR_TIMEOUT,
69                ready=DEFAULT_READY_TIMEOUT,
70                progress=DEFAULT_PROGRESS_INTERVAL):
71     """Initializes this class.
72
73     @type connect: number
74     @param connect: Timeout for establishing connection
75     @type listen: number
76     @param listen: Timeout for starting to listen for connections
77     @type error: number
78     @param error: Length of time until errors cause hard failure
79     @type ready: number
80     @param ready: Timeout for daemon to become ready
81     @type progress: number
82     @param progress: Progress update interval
83
84     """
85     self.error = error
86     self.ready = ready
87     self.listen = listen
88     self.connect = connect
89     self.progress = progress
90
91
92 class ImportExportCbBase(object):
93   """Callbacks for disk import/export.
94
95   """
96   def ReportListening(self, ie, private, component):
97     """Called when daemon started listening.
98
99     @type ie: Subclass of L{_DiskImportExportBase}
100     @param ie: Import/export object
101     @param private: Private data passed to import/export object
102     @param component: transfer component name
103
104     """
105
106   def ReportConnected(self, ie, private):
107     """Called when a connection has been established.
108
109     @type ie: Subclass of L{_DiskImportExportBase}
110     @param ie: Import/export object
111     @param private: Private data passed to import/export object
112
113     """
114
115   def ReportProgress(self, ie, private):
116     """Called when new progress information should be reported.
117
118     @type ie: Subclass of L{_DiskImportExportBase}
119     @param ie: Import/export object
120     @param private: Private data passed to import/export object
121
122     """
123
124   def ReportFinished(self, ie, private):
125     """Called when a transfer has finished.
126
127     @type ie: Subclass of L{_DiskImportExportBase}
128     @param ie: Import/export object
129     @param private: Private data passed to import/export object
130
131     """
132
133
134 class _DiskImportExportBase(object):
135   MODE_TEXT = None
136
137   def __init__(self, lu, node_name, opts,
138                instance, component, timeouts, cbs, private=None):
139     """Initializes this class.
140
141     @param lu: Logical unit instance
142     @type node_name: string
143     @param node_name: Node name for import
144     @type opts: L{objects.ImportExportOptions}
145     @param opts: Import/export daemon options
146     @type instance: L{objects.Instance}
147     @param instance: Instance object
148     @type component: string
149     @param component: which part of the instance is being imported
150     @type timeouts: L{ImportExportTimeouts}
151     @param timeouts: Timeouts for this import
152     @type cbs: L{ImportExportCbBase}
153     @param cbs: Callbacks
154     @param private: Private data for callback functions
155
156     """
157     assert self.MODE_TEXT
158
159     self._lu = lu
160     self.node_name = node_name
161     self._opts = opts.Copy()
162     self._instance = instance
163     self._component = component
164     self._timeouts = timeouts
165     self._cbs = cbs
166     self._private = private
167
168     # Set master daemon's timeout in options for import/export daemon
169     assert self._opts.connect_timeout is None
170     self._opts.connect_timeout = timeouts.connect
171
172     # Parent loop
173     self._loop = None
174
175     # Timestamps
176     self._ts_begin = None
177     self._ts_connected = None
178     self._ts_finished = None
179     self._ts_cleanup = None
180     self._ts_last_progress = None
181     self._ts_last_error = None
182
183     # Transfer status
184     self.success = None
185     self.final_message = None
186
187     # Daemon status
188     self._daemon_name = None
189     self._daemon = None
190
191   @property
192   def recent_output(self):
193     """Returns the most recent output from the daemon.
194
195     """
196     if self._daemon:
197       return "\n".join(self._daemon.recent_output)
198
199     return None
200
201   @property
202   def progress(self):
203     """Returns transfer progress information.
204
205     """
206     if not self._daemon:
207       return None
208
209     return (self._daemon.progress_mbytes,
210             self._daemon.progress_throughput,
211             self._daemon.progress_percent,
212             self._daemon.progress_eta)
213
214   @property
215   def magic(self):
216     """Returns the magic value for this import/export.
217
218     """
219     return self._opts.magic
220
221   @property
222   def active(self):
223     """Determines whether this transport is still active.
224
225     """
226     return self.success is None
227
228   @property
229   def loop(self):
230     """Returns parent loop.
231
232     @rtype: L{ImportExportLoop}
233
234     """
235     return self._loop
236
237   def SetLoop(self, loop):
238     """Sets the parent loop.
239
240     @type loop: L{ImportExportLoop}
241
242     """
243     if self._loop:
244       raise errors.ProgrammerError("Loop can only be set once")
245
246     self._loop = loop
247
248   def _StartDaemon(self):
249     """Starts the import/export daemon.
250
251     """
252     raise NotImplementedError()
253
254   def CheckDaemon(self):
255     """Checks whether daemon has been started and if not, starts it.
256
257     @rtype: string
258     @return: Daemon name
259
260     """
261     assert self._ts_cleanup is None
262
263     if self._daemon_name is None:
264       assert self._ts_begin is None
265
266       result = self._StartDaemon()
267       if result.fail_msg:
268         raise _ImportExportError("Failed to start %s on %s: %s" %
269                                  (self.MODE_TEXT, self.node_name,
270                                   result.fail_msg))
271
272       daemon_name = result.payload
273
274       logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
275                    self.node_name)
276
277       self._ts_begin = time.time()
278       self._daemon_name = daemon_name
279
280     return self._daemon_name
281
282   def GetDaemonName(self):
283     """Returns the daemon name.
284
285     """
286     assert self._daemon_name, "Daemon has not been started"
287     assert self._ts_cleanup is None
288     return self._daemon_name
289
290   def Abort(self):
291     """Sends SIGTERM to import/export daemon (if still active).
292
293     """
294     if self._daemon_name:
295       self._lu.LogWarning("Aborting %s '%s' on %s",
296                           self.MODE_TEXT, self._daemon_name, self.node_name)
297       result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
298       if result.fail_msg:
299         self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
300                             self.MODE_TEXT, self._daemon_name,
301                             self.node_name, result.fail_msg)
302         return False
303
304     return True
305
306   def _SetDaemonData(self, data):
307     """Internal function for updating status daemon data.
308
309     @type data: L{objects.ImportExportStatus}
310     @param data: Daemon status data
311
312     """
313     assert self._ts_begin is not None
314
315     if not data:
316       if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready):
317         raise _ImportExportError("Didn't become ready after %s seconds" %
318                                  self._timeouts.ready)
319
320       return False
321
322     self._daemon = data
323
324     return True
325
326   def SetDaemonData(self, success, data):
327     """Updates daemon status data.
328
329     @type success: bool
330     @param success: Whether fetching data was successful or not
331     @type data: L{objects.ImportExportStatus}
332     @param data: Daemon status data
333
334     """
335     if not success:
336       if self._ts_last_error is None:
337         self._ts_last_error = time.time()
338
339       elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error):
340         raise _ImportExportError("Too many errors while updating data")
341
342       return False
343
344     self._ts_last_error = None
345
346     return self._SetDaemonData(data)
347
348   def CheckListening(self):
349     """Checks whether the daemon is listening.
350
351     """
352     raise NotImplementedError()
353
354   def _GetConnectedCheckEpoch(self):
355     """Returns timeout to calculate connect timeout.
356
357     """
358     raise NotImplementedError()
359
360   def CheckConnected(self):
361     """Checks whether the daemon is connected.
362
363     @rtype: bool
364     @return: Whether the daemon is connected
365
366     """
367     assert self._daemon, "Daemon status missing"
368
369     if self._ts_connected is not None:
370       return True
371
372     if self._daemon.connected:
373       self._ts_connected = time.time()
374
375       # TODO: Log remote peer
376       logging.debug("%s '%s' on %s is now connected",
377                     self.MODE_TEXT, self._daemon_name, self.node_name)
378
379       self._cbs.ReportConnected(self, self._private)
380
381       return True
382
383     if utils.TimeoutExpired(self._GetConnectedCheckEpoch(),
384                             self._timeouts.connect):
385       raise _ImportExportError("Not connected after %s seconds" %
386                                self._timeouts.connect)
387
388     return False
389
390   def _CheckProgress(self):
391     """Checks whether a progress update should be reported.
392
393     """
394     if ((self._ts_last_progress is None or
395         utils.TimeoutExpired(self._ts_last_progress,
396                              self._timeouts.progress)) and
397         self._daemon and
398         self._daemon.progress_mbytes is not None and
399         self._daemon.progress_throughput is not None):
400       self._cbs.ReportProgress(self, self._private)
401       self._ts_last_progress = time.time()
402
403   def CheckFinished(self):
404     """Checks whether the daemon exited.
405
406     @rtype: bool
407     @return: Whether the transfer is finished
408
409     """
410     assert self._daemon, "Daemon status missing"
411
412     if self._ts_finished:
413       return True
414
415     if self._daemon.exit_status is None:
416       # TODO: Adjust delay for ETA expiring soon
417       self._CheckProgress()
418       return False
419
420     self._ts_finished = time.time()
421
422     self._ReportFinished(self._daemon.exit_status == 0,
423                          self._daemon.error_message)
424
425     return True
426
427   def _ReportFinished(self, success, message):
428     """Transfer is finished or daemon exited.
429
430     @type success: bool
431     @param success: Whether the transfer was successful
432     @type message: string
433     @param message: Error message
434
435     """
436     assert self.success is None
437
438     self.success = success
439     self.final_message = message
440
441     if success:
442       logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
443                    self._daemon_name, self.node_name)
444     elif self._daemon_name:
445       self._lu.LogWarning("%s '%s' on %s failed: %s",
446                           self.MODE_TEXT, self._daemon_name, self.node_name,
447                           message)
448     else:
449       self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
450                           self.node_name, message)
451
452     self._cbs.ReportFinished(self, self._private)
453
454   def _Finalize(self):
455     """Makes the RPC call to finalize this import/export.
456
457     """
458     return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
459
460   def Finalize(self, error=None):
461     """Finalizes this import/export.
462
463     """
464     if self._daemon_name:
465       logging.info("Finalizing %s '%s' on %s",
466                    self.MODE_TEXT, self._daemon_name, self.node_name)
467
468       result = self._Finalize()
469       if result.fail_msg:
470         self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
471                             self.MODE_TEXT, self._daemon_name,
472                             self.node_name, result.fail_msg)
473         return False
474
475       # Daemon is no longer running
476       self._daemon_name = None
477       self._ts_cleanup = time.time()
478
479     if error:
480       self._ReportFinished(False, error)
481
482     return True
483
484
485 class DiskImport(_DiskImportExportBase):
486   MODE_TEXT = "import"
487
488   def __init__(self, lu, node_name, opts, instance, component,
489                dest, dest_args, timeouts, cbs, private=None):
490     """Initializes this class.
491
492     @param lu: Logical unit instance
493     @type node_name: string
494     @param node_name: Node name for import
495     @type opts: L{objects.ImportExportOptions}
496     @param opts: Import/export daemon options
497     @type instance: L{objects.Instance}
498     @param instance: Instance object
499     @type component: string
500     @param component: which part of the instance is being imported
501     @param dest: I/O destination
502     @param dest_args: I/O arguments
503     @type timeouts: L{ImportExportTimeouts}
504     @param timeouts: Timeouts for this import
505     @type cbs: L{ImportExportCbBase}
506     @param cbs: Callbacks
507     @param private: Private data for callback functions
508
509     """
510     _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
511                                    component, timeouts, cbs, private)
512     self._dest = dest
513     self._dest_args = dest_args
514
515     # Timestamps
516     self._ts_listening = None
517
518   @property
519   def listen_port(self):
520     """Returns the port the daemon is listening on.
521
522     """
523     if self._daemon:
524       return self._daemon.listen_port
525
526     return None
527
528   def _StartDaemon(self):
529     """Starts the import daemon.
530
531     """
532     return self._lu.rpc.call_import_start(self.node_name, self._opts,
533                                           self._instance, self._component,
534                                           (self._dest, self._dest_args))
535
536   def CheckListening(self):
537     """Checks whether the daemon is listening.
538
539     @rtype: bool
540     @return: Whether the daemon is listening
541
542     """
543     assert self._daemon, "Daemon status missing"
544
545     if self._ts_listening is not None:
546       return True
547
548     port = self._daemon.listen_port
549     if port is not None:
550       self._ts_listening = time.time()
551
552       logging.debug("Import '%s' on %s is now listening on port %s",
553                     self._daemon_name, self.node_name, port)
554
555       self._cbs.ReportListening(self, self._private, self._component)
556
557       return True
558
559     if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen):
560       raise _ImportExportError("Not listening after %s seconds" %
561                                self._timeouts.listen)
562
563     return False
564
565   def _GetConnectedCheckEpoch(self):
566     """Returns the time since we started listening.
567
568     """
569     assert self._ts_listening is not None, \
570            ("Checking whether an import is connected is only useful"
571             " once it's been listening")
572
573     return self._ts_listening
574
575
576 class DiskExport(_DiskImportExportBase):
577   MODE_TEXT = "export"
578
579   def __init__(self, lu, node_name, opts, dest_host, dest_port,
580                instance, component, source, source_args,
581                timeouts, cbs, private=None):
582     """Initializes this class.
583
584     @param lu: Logical unit instance
585     @type node_name: string
586     @param node_name: Node name for import
587     @type opts: L{objects.ImportExportOptions}
588     @param opts: Import/export daemon options
589     @type dest_host: string
590     @param dest_host: Destination host name or IP address
591     @type dest_port: number
592     @param dest_port: Destination port number
593     @type instance: L{objects.Instance}
594     @param instance: Instance object
595     @type component: string
596     @param component: which part of the instance is being imported
597     @param source: I/O source
598     @param source_args: I/O source
599     @type timeouts: L{ImportExportTimeouts}
600     @param timeouts: Timeouts for this import
601     @type cbs: L{ImportExportCbBase}
602     @param cbs: Callbacks
603     @param private: Private data for callback functions
604
605     """
606     _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
607                                    component, timeouts, cbs, private)
608     self._dest_host = dest_host
609     self._dest_port = dest_port
610     self._source = source
611     self._source_args = source_args
612
613   def _StartDaemon(self):
614     """Starts the export daemon.
615
616     """
617     return self._lu.rpc.call_export_start(self.node_name, self._opts,
618                                           self._dest_host, self._dest_port,
619                                           self._instance, self._component,
620                                           (self._source, self._source_args))
621
622   def CheckListening(self):
623     """Checks whether the daemon is listening.
624
625     """
626     # Only an import can be listening
627     return True
628
629   def _GetConnectedCheckEpoch(self):
630     """Returns the time since the daemon started.
631
632     """
633     assert self._ts_begin is not None
634
635     return self._ts_begin
636
637
638 def FormatProgress(progress):
639   """Formats progress information for user consumption
640
641   """
642   (mbytes, throughput, percent, eta) = progress
643
644   parts = [
645     utils.FormatUnit(mbytes, "h"),
646
647     # Not using FormatUnit as it doesn't support kilobytes
648     "%0.1f MiB/s" % throughput,
649     ]
650
651   if percent is not None:
652     parts.append("%d%%" % percent)
653
654   if eta is not None:
655     parts.append("ETA %s" % utils.FormatSeconds(eta))
656
657   return utils.CommaJoin(parts)
658
659
660 class ImportExportLoop:
661   MIN_DELAY = 1.0
662   MAX_DELAY = 20.0
663
664   def __init__(self, lu):
665     """Initializes this class.
666
667     """
668     self._lu = lu
669     self._queue = []
670     self._pending_add = []
671
672   def Add(self, diskie):
673     """Adds an import/export object to the loop.
674
675     @type diskie: Subclass of L{_DiskImportExportBase}
676     @param diskie: Import/export object
677
678     """
679     assert diskie not in self._pending_add
680     assert diskie.loop is None
681
682     diskie.SetLoop(self)
683
684     # Adding new objects to a staging list is necessary, otherwise the main
685     # loop gets confused if callbacks modify the queue while the main loop is
686     # iterating over it.
687     self._pending_add.append(diskie)
688
689   @staticmethod
690   def _CollectDaemonStatus(lu, daemons):
691     """Collects the status for all import/export daemons.
692
693     """
694     daemon_status = {}
695
696     for node_name, names in daemons.iteritems():
697       result = lu.rpc.call_impexp_status(node_name, names)
698       if result.fail_msg:
699         lu.LogWarning("Failed to get daemon status on %s: %s",
700                       node_name, result.fail_msg)
701         continue
702
703       assert len(names) == len(result.payload)
704
705       daemon_status[node_name] = dict(zip(names, result.payload))
706
707     return daemon_status
708
709   @staticmethod
710   def _GetActiveDaemonNames(queue):
711     """Gets the names of all active daemons.
712
713     """
714     result = {}
715     for diskie in queue:
716       if not diskie.active:
717         continue
718
719       try:
720         # Start daemon if necessary
721         daemon_name = diskie.CheckDaemon()
722       except _ImportExportError, err:
723         logging.exception("%s failed", diskie.MODE_TEXT)
724         diskie.Finalize(error=str(err))
725         continue
726
727       result.setdefault(diskie.node_name, []).append(daemon_name)
728
729     assert len(queue) >= len(result)
730     assert len(queue) >= sum([len(names) for names in result.itervalues()])
731
732     logging.debug("daemons=%r", result)
733
734     return result
735
736   def _AddPendingToQueue(self):
737     """Adds all pending import/export objects to the internal queue.
738
739     """
740     assert compat.all(diskie not in self._queue and diskie.loop == self
741                       for diskie in self._pending_add)
742
743     self._queue.extend(self._pending_add)
744
745     del self._pending_add[:]
746
747   def Run(self):
748     """Utility main loop.
749
750     """
751     while True:
752       self._AddPendingToQueue()
753
754       # Collect all active daemon names
755       daemons = self._GetActiveDaemonNames(self._queue)
756       if not daemons:
757         break
758
759       # Collection daemon status data
760       data = self._CollectDaemonStatus(self._lu, daemons)
761
762       # Use data
763       delay = self.MAX_DELAY
764       for diskie in self._queue:
765         if not diskie.active:
766           continue
767
768         try:
769           try:
770             all_daemon_data = data[diskie.node_name]
771           except KeyError:
772             result = diskie.SetDaemonData(False, None)
773           else:
774             result = \
775               diskie.SetDaemonData(True,
776                                    all_daemon_data[diskie.GetDaemonName()])
777
778           if not result:
779             # Daemon not yet ready, retry soon
780             delay = min(3.0, delay)
781             continue
782
783           if diskie.CheckFinished():
784             # Transfer finished
785             diskie.Finalize()
786             continue
787
788           # Normal case: check again in 5 seconds
789           delay = min(5.0, delay)
790
791           if not diskie.CheckListening():
792             # Not yet listening, retry soon
793             delay = min(1.0, delay)
794             continue
795
796           if not diskie.CheckConnected():
797             # Not yet connected, retry soon
798             delay = min(1.0, delay)
799             continue
800
801         except _ImportExportError, err:
802           logging.exception("%s failed", diskie.MODE_TEXT)
803           diskie.Finalize(error=str(err))
804
805       if not compat.any(diskie.active for diskie in self._queue):
806         break
807
808       # Wait a bit
809       delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
810       logging.debug("Waiting for %ss", delay)
811       time.sleep(delay)
812
813   def FinalizeAll(self):
814     """Finalizes all pending transfers.
815
816     """
817     success = True
818
819     for diskie in self._queue:
820       success = diskie.Finalize() and success
821
822     return success
823
824
825 class _TransferInstCbBase(ImportExportCbBase):
826   def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
827                dest_node, dest_ip):
828     """Initializes this class.
829
830     """
831     ImportExportCbBase.__init__(self)
832
833     self.lu = lu
834     self.feedback_fn = feedback_fn
835     self.instance = instance
836     self.timeouts = timeouts
837     self.src_node = src_node
838     self.src_cbs = src_cbs
839     self.dest_node = dest_node
840     self.dest_ip = dest_ip
841
842
843 class _TransferInstSourceCb(_TransferInstCbBase):
844   def ReportConnected(self, ie, dtp):
845     """Called when a connection has been established.
846
847     """
848     assert self.src_cbs is None
849     assert dtp.src_export == ie
850     assert dtp.dest_import
851
852     self.feedback_fn("%s is sending data on %s" %
853                      (dtp.data.name, ie.node_name))
854
855   def ReportProgress(self, ie, dtp):
856     """Called when new progress information should be reported.
857
858     """
859     progress = ie.progress
860     if not progress:
861       return
862
863     self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
864
865   def ReportFinished(self, ie, dtp):
866     """Called when a transfer has finished.
867
868     """
869     assert self.src_cbs is None
870     assert dtp.src_export == ie
871     assert dtp.dest_import
872
873     if ie.success:
874       self.feedback_fn("%s finished sending data" % dtp.data.name)
875     else:
876       self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
877                        (dtp.data.name, ie.final_message, ie.recent_output))
878
879     dtp.RecordResult(ie.success)
880
881     cb = dtp.data.finished_fn
882     if cb:
883       cb()
884
885     # TODO: Check whether sending SIGTERM right away is okay, maybe we should
886     # give the daemon a moment to sort things out
887     if dtp.dest_import and not ie.success:
888       dtp.dest_import.Abort()
889
890
891 class _TransferInstDestCb(_TransferInstCbBase):
892   def ReportListening(self, ie, dtp, component):
893     """Called when daemon started listening.
894
895     """
896     assert self.src_cbs
897     assert dtp.src_export is None
898     assert dtp.dest_import
899     assert dtp.export_opts
900
901     self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
902
903     # Start export on source node
904     de = DiskExport(self.lu, self.src_node, dtp.export_opts,
905                     self.dest_ip, ie.listen_port, self.instance,
906                     component, dtp.data.src_io, dtp.data.src_ioargs,
907                     self.timeouts, self.src_cbs, private=dtp)
908     ie.loop.Add(de)
909
910     dtp.src_export = de
911
912   def ReportConnected(self, ie, dtp):
913     """Called when a connection has been established.
914
915     """
916     self.feedback_fn("%s is receiving data on %s" %
917                      (dtp.data.name, self.dest_node))
918
919   def ReportFinished(self, ie, dtp):
920     """Called when a transfer has finished.
921
922     """
923     if ie.success:
924       self.feedback_fn("%s finished receiving data" % dtp.data.name)
925     else:
926       self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
927                        (dtp.data.name, ie.final_message, ie.recent_output))
928
929     dtp.RecordResult(ie.success)
930
931     # TODO: Check whether sending SIGTERM right away is okay, maybe we should
932     # give the daemon a moment to sort things out
933     if dtp.src_export and not ie.success:
934       dtp.src_export.Abort()
935
936
937 class DiskTransfer(object):
938   def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
939                finished_fn):
940     """Initializes this class.
941
942     @type name: string
943     @param name: User-visible name for this transfer (e.g. "disk/0")
944     @param src_io: Source I/O type
945     @param src_ioargs: Source I/O arguments
946     @param dest_io: Destination I/O type
947     @param dest_ioargs: Destination I/O arguments
948     @type finished_fn: callable
949     @param finished_fn: Function called once transfer has finished
950
951     """
952     self.name = name
953
954     self.src_io = src_io
955     self.src_ioargs = src_ioargs
956
957     self.dest_io = dest_io
958     self.dest_ioargs = dest_ioargs
959
960     self.finished_fn = finished_fn
961
962
963 class _DiskTransferPrivate(object):
964   def __init__(self, data, success, export_opts):
965     """Initializes this class.
966
967     @type data: L{DiskTransfer}
968     @type success: bool
969
970     """
971     self.data = data
972     self.success = success
973     self.export_opts = export_opts
974
975     self.src_export = None
976     self.dest_import = None
977
978   def RecordResult(self, success):
979     """Updates the status.
980
981     One failed part will cause the whole transfer to fail.
982
983     """
984     self.success = self.success and success
985
986
987 def _GetInstDiskMagic(base, instance_name, index):
988   """Computes the magic value for a disk export or import.
989
990   @type base: string
991   @param base: Random seed value (can be the same for all disks of a transfer)
992   @type instance_name: string
993   @param instance_name: Name of instance
994   @type index: number
995   @param index: Disk index
996
997   """
998   h = compat.sha1_hash()
999   h.update(str(constants.RIE_VERSION))
1000   h.update(base)
1001   h.update(instance_name)
1002   h.update(str(index))
1003   return h.hexdigest()
1004
1005
1006 def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
1007                          instance, all_transfers):
1008   """Transfers an instance's data from one node to another.
1009
1010   @param lu: Logical unit instance
1011   @param feedback_fn: Feedback function
1012   @type src_node: string
1013   @param src_node: Source node name
1014   @type dest_node: string
1015   @param dest_node: Destination node name
1016   @type dest_ip: string
1017   @param dest_ip: IP address of destination node
1018   @type instance: L{objects.Instance}
1019   @param instance: Instance object
1020   @type all_transfers: list of L{DiskTransfer} instances
1021   @param all_transfers: List of all disk transfers to be made
1022   @rtype: list
1023   @return: List with a boolean (True=successful, False=failed) for success for
1024            each transfer
1025
1026   """
1027   # Disable compression for all moves as these are all within the same cluster
1028   compress = constants.IEC_NONE
1029
1030   logging.debug("Source node %s, destination node %s, compression '%s'",
1031                 src_node, dest_node, compress)
1032
1033   timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1034   src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1035                                   src_node, None, dest_node, dest_ip)
1036   dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1037                                  src_node, src_cbs, dest_node, dest_ip)
1038
1039   all_dtp = []
1040
1041   base_magic = utils.GenerateSecret(6)
1042
1043   ieloop = ImportExportLoop(lu)
1044   try:
1045     for idx, transfer in enumerate(all_transfers):
1046       if transfer:
1047         feedback_fn("Exporting %s from %s to %s" %
1048                     (transfer.name, src_node, dest_node))
1049
1050         magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1051         opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1052                                            compress=compress, magic=magic)
1053
1054         dtp = _DiskTransferPrivate(transfer, True, opts)
1055
1056         di = DiskImport(lu, dest_node, opts, instance, "disk%d" % idx,
1057                         transfer.dest_io, transfer.dest_ioargs,
1058                         timeouts, dest_cbs, private=dtp)
1059         ieloop.Add(di)
1060
1061         dtp.dest_import = di
1062       else:
1063         dtp = _DiskTransferPrivate(None, False, None)
1064
1065       all_dtp.append(dtp)
1066
1067     ieloop.Run()
1068   finally:
1069     ieloop.FinalizeAll()
1070
1071   assert len(all_dtp) == len(all_transfers)
1072   assert compat.all((dtp.src_export is None or
1073                       dtp.src_export.success is not None) and
1074                      (dtp.dest_import is None or
1075                       dtp.dest_import.success is not None)
1076                      for dtp in all_dtp), \
1077          "Not all imports/exports are finalized"
1078
1079   return [bool(dtp.success) for dtp in all_dtp]
1080
1081
1082 class _RemoteExportCb(ImportExportCbBase):
1083   def __init__(self, feedback_fn, disk_count):
1084     """Initializes this class.
1085
1086     """
1087     ImportExportCbBase.__init__(self)
1088     self._feedback_fn = feedback_fn
1089     self._dresults = [None] * disk_count
1090
1091   @property
1092   def disk_results(self):
1093     """Returns per-disk results.
1094
1095     """
1096     return self._dresults
1097
1098   def ReportConnected(self, ie, private):
1099     """Called when a connection has been established.
1100
1101     """
1102     (idx, _) = private
1103
1104     self._feedback_fn("Disk %s is now sending data" % idx)
1105
1106   def ReportProgress(self, ie, private):
1107     """Called when new progress information should be reported.
1108
1109     """
1110     (idx, _) = private
1111
1112     progress = ie.progress
1113     if not progress:
1114       return
1115
1116     self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1117
1118   def ReportFinished(self, ie, private):
1119     """Called when a transfer has finished.
1120
1121     """
1122     (idx, finished_fn) = private
1123
1124     if ie.success:
1125       self._feedback_fn("Disk %s finished sending data" % idx)
1126     else:
1127       self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1128                         (idx, ie.final_message, ie.recent_output))
1129
1130     self._dresults[idx] = bool(ie.success)
1131
1132     if finished_fn:
1133       finished_fn()
1134
1135
1136 class ExportInstanceHelper:
1137   def __init__(self, lu, feedback_fn, instance):
1138     """Initializes this class.
1139
1140     @param lu: Logical unit instance
1141     @param feedback_fn: Feedback function
1142     @type instance: L{objects.Instance}
1143     @param instance: Instance object
1144
1145     """
1146     self._lu = lu
1147     self._feedback_fn = feedback_fn
1148     self._instance = instance
1149
1150     self._snap_disks = []
1151     self._removed_snaps = [False] * len(instance.disks)
1152
1153   def CreateSnapshots(self):
1154     """Creates an LVM snapshot for every disk of the instance.
1155
1156     """
1157     assert not self._snap_disks
1158
1159     instance = self._instance
1160     src_node = instance.primary_node
1161
1162     for idx, disk in enumerate(instance.disks):
1163       self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1164                         (idx, src_node))
1165
1166       # result.payload will be a snapshot of an lvm leaf of the one we
1167       # passed
1168       result = self._lu.rpc.call_blockdev_snapshot(src_node, (disk, instance))
1169       new_dev = False
1170       msg = result.fail_msg
1171       if msg:
1172         self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1173                             idx, src_node, msg)
1174       elif (not isinstance(result.payload, (tuple, list)) or
1175             len(result.payload) != 2):
1176         self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1177                             " result '%s'", idx, src_node, result.payload)
1178       else:
1179         disk_id = tuple(result.payload)
1180         disk_params = constants.DISK_LD_DEFAULTS[constants.LD_LV].copy()
1181         new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1182                                logical_id=disk_id, physical_id=disk_id,
1183                                iv_name=disk.iv_name,
1184                                params=disk_params)
1185
1186       self._snap_disks.append(new_dev)
1187
1188     assert len(self._snap_disks) == len(instance.disks)
1189     assert len(self._removed_snaps) == len(instance.disks)
1190
1191   def _RemoveSnapshot(self, disk_index):
1192     """Removes an LVM snapshot.
1193
1194     @type disk_index: number
1195     @param disk_index: Index of the snapshot to be removed
1196
1197     """
1198     disk = self._snap_disks[disk_index]
1199     if disk and not self._removed_snaps[disk_index]:
1200       src_node = self._instance.primary_node
1201
1202       self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1203                         (disk_index, src_node))
1204
1205       result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1206       if result.fail_msg:
1207         self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1208                             " %s: %s", disk_index, src_node, result.fail_msg)
1209       else:
1210         self._removed_snaps[disk_index] = True
1211
1212   def LocalExport(self, dest_node):
1213     """Intra-cluster instance export.
1214
1215     @type dest_node: L{objects.Node}
1216     @param dest_node: Destination node
1217
1218     """
1219     instance = self._instance
1220     src_node = instance.primary_node
1221
1222     assert len(self._snap_disks) == len(instance.disks)
1223
1224     transfers = []
1225
1226     for idx, dev in enumerate(self._snap_disks):
1227       if not dev:
1228         transfers.append(None)
1229         continue
1230
1231       path = utils.PathJoin(pathutils.EXPORT_DIR, "%s.new" % instance.name,
1232                             dev.physical_id[1])
1233
1234       finished_fn = compat.partial(self._TransferFinished, idx)
1235
1236       # FIXME: pass debug option from opcode to backend
1237       dt = DiskTransfer("snapshot/%s" % idx,
1238                         constants.IEIO_SCRIPT, (dev, idx),
1239                         constants.IEIO_FILE, (path, ),
1240                         finished_fn)
1241       transfers.append(dt)
1242
1243     # Actually export data
1244     dresults = TransferInstanceData(self._lu, self._feedback_fn,
1245                                     src_node, dest_node.name,
1246                                     dest_node.secondary_ip,
1247                                     instance, transfers)
1248
1249     assert len(dresults) == len(instance.disks)
1250
1251     self._feedback_fn("Finalizing export on %s" % dest_node.name)
1252     result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1253                                                self._snap_disks)
1254     msg = result.fail_msg
1255     fin_resu = not msg
1256     if msg:
1257       self._lu.LogWarning("Could not finalize export for instance %s"
1258                           " on node %s: %s", instance.name, dest_node.name, msg)
1259
1260     return (fin_resu, dresults)
1261
1262   def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1263     """Inter-cluster instance export.
1264
1265     @type disk_info: list
1266     @param disk_info: Per-disk destination information
1267     @type key_name: string
1268     @param key_name: Name of X509 key to use
1269     @type dest_ca_pem: string
1270     @param dest_ca_pem: Destination X509 CA in PEM format
1271     @type timeouts: L{ImportExportTimeouts}
1272     @param timeouts: Timeouts for this import
1273
1274     """
1275     instance = self._instance
1276
1277     assert len(disk_info) == len(instance.disks)
1278
1279     cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1280
1281     ieloop = ImportExportLoop(self._lu)
1282     try:
1283       for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1284                                                            disk_info)):
1285         # Decide whether to use IPv6
1286         ipv6 = netutils.IP6Address.IsValid(host)
1287
1288         opts = objects.ImportExportOptions(key_name=key_name,
1289                                            ca_pem=dest_ca_pem,
1290                                            magic=magic, ipv6=ipv6)
1291
1292         self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1293         finished_fn = compat.partial(self._TransferFinished, idx)
1294         ieloop.Add(DiskExport(self._lu, instance.primary_node,
1295                               opts, host, port, instance, "disk%d" % idx,
1296                               constants.IEIO_SCRIPT, (dev, idx),
1297                               timeouts, cbs, private=(idx, finished_fn)))
1298
1299       ieloop.Run()
1300     finally:
1301       ieloop.FinalizeAll()
1302
1303     return (True, cbs.disk_results)
1304
1305   def _TransferFinished(self, idx):
1306     """Called once a transfer has finished.
1307
1308     @type idx: number
1309     @param idx: Disk index
1310
1311     """
1312     logging.debug("Transfer %s finished", idx)
1313     self._RemoveSnapshot(idx)
1314
1315   def Cleanup(self):
1316     """Remove all snapshots.
1317
1318     """
1319     assert len(self._removed_snaps) == len(self._instance.disks)
1320     for idx in range(len(self._instance.disks)):
1321       self._RemoveSnapshot(idx)
1322
1323
1324 class _RemoteImportCb(ImportExportCbBase):
1325   def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1326                external_address):
1327     """Initializes this class.
1328
1329     @type cds: string
1330     @param cds: Cluster domain secret
1331     @type x509_cert_pem: string
1332     @param x509_cert_pem: CA used for signing import key
1333     @type disk_count: number
1334     @param disk_count: Number of disks
1335     @type external_address: string
1336     @param external_address: External address of destination node
1337
1338     """
1339     ImportExportCbBase.__init__(self)
1340     self._feedback_fn = feedback_fn
1341     self._cds = cds
1342     self._x509_cert_pem = x509_cert_pem
1343     self._disk_count = disk_count
1344     self._external_address = external_address
1345
1346     self._dresults = [None] * disk_count
1347     self._daemon_port = [None] * disk_count
1348
1349     self._salt = utils.GenerateSecret(8)
1350
1351   @property
1352   def disk_results(self):
1353     """Returns per-disk results.
1354
1355     """
1356     return self._dresults
1357
1358   def _CheckAllListening(self):
1359     """Checks whether all daemons are listening.
1360
1361     If all daemons are listening, the information is sent to the client.
1362
1363     """
1364     if not compat.all(dp is not None for dp in self._daemon_port):
1365       return
1366
1367     host = self._external_address
1368
1369     disks = []
1370     for idx, (port, magic) in enumerate(self._daemon_port):
1371       disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1372                                                idx, host, port, magic))
1373
1374     assert len(disks) == self._disk_count
1375
1376     self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1377       "disks": disks,
1378       "x509_ca": self._x509_cert_pem,
1379       })
1380
1381   def ReportListening(self, ie, private, _):
1382     """Called when daemon started listening.
1383
1384     """
1385     (idx, ) = private
1386
1387     self._feedback_fn("Disk %s is now listening" % idx)
1388
1389     assert self._daemon_port[idx] is None
1390
1391     self._daemon_port[idx] = (ie.listen_port, ie.magic)
1392
1393     self._CheckAllListening()
1394
1395   def ReportConnected(self, ie, private):
1396     """Called when a connection has been established.
1397
1398     """
1399     (idx, ) = private
1400
1401     self._feedback_fn("Disk %s is now receiving data" % idx)
1402
1403   def ReportFinished(self, ie, private):
1404     """Called when a transfer has finished.
1405
1406     """
1407     (idx, ) = private
1408
1409     # Daemon is certainly no longer listening
1410     self._daemon_port[idx] = None
1411
1412     if ie.success:
1413       self._feedback_fn("Disk %s finished receiving data" % idx)
1414     else:
1415       self._feedback_fn(("Disk %s failed to receive data: %s"
1416                          " (recent output: %s)") %
1417                         (idx, ie.final_message, ie.recent_output))
1418
1419     self._dresults[idx] = bool(ie.success)
1420
1421
1422 def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1423                  cds, timeouts):
1424   """Imports an instance from another cluster.
1425
1426   @param lu: Logical unit instance
1427   @param feedback_fn: Feedback function
1428   @type instance: L{objects.Instance}
1429   @param instance: Instance object
1430   @type pnode: L{objects.Node}
1431   @param pnode: Primary node of instance as an object
1432   @type source_x509_ca: OpenSSL.crypto.X509
1433   @param source_x509_ca: Import source's X509 CA
1434   @type cds: string
1435   @param cds: Cluster domain secret
1436   @type timeouts: L{ImportExportTimeouts}
1437   @param timeouts: Timeouts for this import
1438
1439   """
1440   source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1441                                                   source_x509_ca)
1442
1443   magic_base = utils.GenerateSecret(6)
1444
1445   # Decide whether to use IPv6
1446   ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1447
1448   # Create crypto key
1449   result = lu.rpc.call_x509_cert_create(instance.primary_node,
1450                                         constants.RIE_CERT_VALIDITY)
1451   result.Raise("Can't create X509 key and certificate on %s" % result.node)
1452
1453   (x509_key_name, x509_cert_pem) = result.payload
1454   try:
1455     # Load certificate
1456     x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1457                                                 x509_cert_pem)
1458
1459     # Sign certificate
1460     signed_x509_cert_pem = \
1461       utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1462
1463     cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1464                           len(instance.disks), pnode.primary_ip)
1465
1466     ieloop = ImportExportLoop(lu)
1467     try:
1468       for idx, dev in enumerate(instance.disks):
1469         magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1470
1471         # Import daemon options
1472         opts = objects.ImportExportOptions(key_name=x509_key_name,
1473                                            ca_pem=source_ca_pem,
1474                                            magic=magic, ipv6=ipv6)
1475
1476         ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1477                               "disk%d" % idx,
1478                               constants.IEIO_SCRIPT, (dev, idx),
1479                               timeouts, cbs, private=(idx, )))
1480
1481       ieloop.Run()
1482     finally:
1483       ieloop.FinalizeAll()
1484   finally:
1485     # Remove crypto key and certificate
1486     result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1487     result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1488
1489   return cbs.disk_results
1490
1491
1492 def _GetImportExportHandshakeMessage(version):
1493   """Returns the handshake message for a RIE protocol version.
1494
1495   @type version: number
1496
1497   """
1498   return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1499
1500
1501 def ComputeRemoteExportHandshake(cds):
1502   """Computes the remote import/export handshake.
1503
1504   @type cds: string
1505   @param cds: Cluster domain secret
1506
1507   """
1508   salt = utils.GenerateSecret(8)
1509   msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1510   return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1511
1512
1513 def CheckRemoteExportHandshake(cds, handshake):
1514   """Checks the handshake of a remote import/export.
1515
1516   @type cds: string
1517   @param cds: Cluster domain secret
1518   @type handshake: sequence
1519   @param handshake: Handshake sent by remote peer
1520
1521   """
1522   try:
1523     (version, hmac_digest, hmac_salt) = handshake
1524   except (TypeError, ValueError), err:
1525     return "Invalid data: %s" % err
1526
1527   if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1528                               hmac_digest, salt=hmac_salt):
1529     return "Hash didn't match, clusters don't share the same domain secret"
1530
1531   if version != constants.RIE_VERSION:
1532     return ("Clusters don't have the same remote import/export protocol"
1533             " (local=%s, remote=%s)" %
1534             (constants.RIE_VERSION, version))
1535
1536   return None
1537
1538
1539 def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1540   """Returns the hashed text for import/export disk information.
1541
1542   @type disk_index: number
1543   @param disk_index: Index of disk (included in hash)
1544   @type host: string
1545   @param host: Hostname
1546   @type port: number
1547   @param port: Daemon port
1548   @type magic: string
1549   @param magic: Magic value
1550
1551   """
1552   return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1553
1554
1555 def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1556   """Verifies received disk information for an export.
1557
1558   @type cds: string
1559   @param cds: Cluster domain secret
1560   @type disk_index: number
1561   @param disk_index: Index of disk (included in hash)
1562   @type disk_info: sequence
1563   @param disk_info: Disk information sent by remote peer
1564
1565   """
1566   try:
1567     (host, port, magic, hmac_digest, hmac_salt) = disk_info
1568   except (TypeError, ValueError), err:
1569     raise errors.GenericError("Invalid data: %s" % err)
1570
1571   if not (host and port and magic):
1572     raise errors.GenericError("Missing destination host, port or magic")
1573
1574   msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1575
1576   if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1577     raise errors.GenericError("HMAC is wrong")
1578
1579   if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1580     destination = host
1581   else:
1582     destination = netutils.Hostname.GetNormalizedName(host)
1583
1584   return (destination,
1585           utils.ValidateServiceName(port),
1586           magic)
1587
1588
1589 def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1590   """Computes the signed disk information for a remote import.
1591
1592   @type cds: string
1593   @param cds: Cluster domain secret
1594   @type salt: string
1595   @param salt: HMAC salt
1596   @type disk_index: number
1597   @param disk_index: Index of disk (included in hash)
1598   @type host: string
1599   @param host: Hostname
1600   @type port: number
1601   @param port: Daemon port
1602   @type magic: string
1603   @param magic: Magic value
1604
1605   """
1606   msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1607   hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1608   return (host, port, magic, hmac_digest, salt)
1609
1610
1611 def CalculateGroupIPolicy(cluster, group):
1612   """Calculate instance policy for group.
1613
1614   """
1615   return cluster.SimpleFillIPolicy(group.ipolicy)
1616
1617
1618 def ComputeDiskSize(disk_template, disks):
1619   """Compute disk size requirements according to disk template
1620
1621   """
1622   # Required free disk space as a function of disk and swap space
1623   req_size_dict = {
1624     constants.DT_DISKLESS: None,
1625     constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
1626     # 128 MB are added for drbd metadata for each disk
1627     constants.DT_DRBD8:
1628       sum(d[constants.IDISK_SIZE] + constants.DRBD_META_SIZE for d in disks),
1629     constants.DT_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
1630     constants.DT_SHARED_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
1631     constants.DT_BLOCK: 0,
1632     constants.DT_RBD: sum(d[constants.IDISK_SIZE] for d in disks),
1633   }
1634
1635   if disk_template not in req_size_dict:
1636     raise errors.ProgrammerError("Disk template '%s' size requirement"
1637                                  " is unknown" % disk_template)
1638
1639   return req_size_dict[disk_template]