Replace LD_* constants with DT_* constants
[ganeti-local] / lib / masterd / instance.py
1 #
2 #
3
4 # Copyright (C) 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Instance-related functions and classes for masterd.
23
24 """
25
26 import logging
27 import time
28 import OpenSSL
29
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import compat
33 from ganeti import utils
34 from ganeti import objects
35 from ganeti import netutils
36 from ganeti import pathutils
37
38
39 class _ImportExportError(Exception):
40   """Local exception to report import/export errors.
41
42   """
43
44
45 class ImportExportTimeouts(object):
46   #: Time until daemon starts writing status file
47   DEFAULT_READY_TIMEOUT = 10
48
49   #: Length of time until errors cause hard failure
50   DEFAULT_ERROR_TIMEOUT = 10
51
52   #: Time after which daemon must be listening
53   DEFAULT_LISTEN_TIMEOUT = 10
54
55   #: Progress update interval
56   DEFAULT_PROGRESS_INTERVAL = 60
57
58   __slots__ = [
59     "error",
60     "ready",
61     "listen",
62     "connect",
63     "progress",
64     ]
65
66   def __init__(self, connect,
67                listen=DEFAULT_LISTEN_TIMEOUT,
68                error=DEFAULT_ERROR_TIMEOUT,
69                ready=DEFAULT_READY_TIMEOUT,
70                progress=DEFAULT_PROGRESS_INTERVAL):
71     """Initializes this class.
72
73     @type connect: number
74     @param connect: Timeout for establishing connection
75     @type listen: number
76     @param listen: Timeout for starting to listen for connections
77     @type error: number
78     @param error: Length of time until errors cause hard failure
79     @type ready: number
80     @param ready: Timeout for daemon to become ready
81     @type progress: number
82     @param progress: Progress update interval
83
84     """
85     self.error = error
86     self.ready = ready
87     self.listen = listen
88     self.connect = connect
89     self.progress = progress
90
91
92 class ImportExportCbBase(object):
93   """Callbacks for disk import/export.
94
95   """
96   def ReportListening(self, ie, private, component):
97     """Called when daemon started listening.
98
99     @type ie: Subclass of L{_DiskImportExportBase}
100     @param ie: Import/export object
101     @param private: Private data passed to import/export object
102     @param component: transfer component name
103
104     """
105
106   def ReportConnected(self, ie, private):
107     """Called when a connection has been established.
108
109     @type ie: Subclass of L{_DiskImportExportBase}
110     @param ie: Import/export object
111     @param private: Private data passed to import/export object
112
113     """
114
115   def ReportProgress(self, ie, private):
116     """Called when new progress information should be reported.
117
118     @type ie: Subclass of L{_DiskImportExportBase}
119     @param ie: Import/export object
120     @param private: Private data passed to import/export object
121
122     """
123
124   def ReportFinished(self, ie, private):
125     """Called when a transfer has finished.
126
127     @type ie: Subclass of L{_DiskImportExportBase}
128     @param ie: Import/export object
129     @param private: Private data passed to import/export object
130
131     """
132
133
134 class _DiskImportExportBase(object):
135   MODE_TEXT = None
136
137   def __init__(self, lu, node_uuid, opts,
138                instance, component, timeouts, cbs, private=None):
139     """Initializes this class.
140
141     @param lu: Logical unit instance
142     @type node_uuid: string
143     @param node_uuid: Node UUID for import
144     @type opts: L{objects.ImportExportOptions}
145     @param opts: Import/export daemon options
146     @type instance: L{objects.Instance}
147     @param instance: Instance object
148     @type component: string
149     @param component: which part of the instance is being imported
150     @type timeouts: L{ImportExportTimeouts}
151     @param timeouts: Timeouts for this import
152     @type cbs: L{ImportExportCbBase}
153     @param cbs: Callbacks
154     @param private: Private data for callback functions
155
156     """
157     assert self.MODE_TEXT
158
159     self._lu = lu
160     self.node_uuid = node_uuid
161     self.node_name = lu.cfg.GetNodeName(node_uuid)
162     self._opts = opts.Copy()
163     self._instance = instance
164     self._component = component
165     self._timeouts = timeouts
166     self._cbs = cbs
167     self._private = private
168
169     # Set master daemon's timeout in options for import/export daemon
170     assert self._opts.connect_timeout is None
171     self._opts.connect_timeout = timeouts.connect
172
173     # Parent loop
174     self._loop = None
175
176     # Timestamps
177     self._ts_begin = None
178     self._ts_connected = None
179     self._ts_finished = None
180     self._ts_cleanup = None
181     self._ts_last_progress = None
182     self._ts_last_error = None
183
184     # Transfer status
185     self.success = None
186     self.final_message = None
187
188     # Daemon status
189     self._daemon_name = None
190     self._daemon = None
191
192   @property
193   def recent_output(self):
194     """Returns the most recent output from the daemon.
195
196     """
197     if self._daemon:
198       return "\n".join(self._daemon.recent_output)
199
200     return None
201
202   @property
203   def progress(self):
204     """Returns transfer progress information.
205
206     """
207     if not self._daemon:
208       return None
209
210     return (self._daemon.progress_mbytes,
211             self._daemon.progress_throughput,
212             self._daemon.progress_percent,
213             self._daemon.progress_eta)
214
215   @property
216   def magic(self):
217     """Returns the magic value for this import/export.
218
219     """
220     return self._opts.magic
221
222   @property
223   def active(self):
224     """Determines whether this transport is still active.
225
226     """
227     return self.success is None
228
229   @property
230   def loop(self):
231     """Returns parent loop.
232
233     @rtype: L{ImportExportLoop}
234
235     """
236     return self._loop
237
238   def SetLoop(self, loop):
239     """Sets the parent loop.
240
241     @type loop: L{ImportExportLoop}
242
243     """
244     if self._loop:
245       raise errors.ProgrammerError("Loop can only be set once")
246
247     self._loop = loop
248
249   def _StartDaemon(self):
250     """Starts the import/export daemon.
251
252     """
253     raise NotImplementedError()
254
255   def CheckDaemon(self):
256     """Checks whether daemon has been started and if not, starts it.
257
258     @rtype: string
259     @return: Daemon name
260
261     """
262     assert self._ts_cleanup is None
263
264     if self._daemon_name is None:
265       assert self._ts_begin is None
266
267       result = self._StartDaemon()
268       if result.fail_msg:
269         raise _ImportExportError("Failed to start %s on %s: %s" %
270                                  (self.MODE_TEXT, self.node_name,
271                                   result.fail_msg))
272
273       daemon_name = result.payload
274
275       logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
276                    self.node_name)
277
278       self._ts_begin = time.time()
279       self._daemon_name = daemon_name
280
281     return self._daemon_name
282
283   def GetDaemonName(self):
284     """Returns the daemon name.
285
286     """
287     assert self._daemon_name, "Daemon has not been started"
288     assert self._ts_cleanup is None
289     return self._daemon_name
290
291   def Abort(self):
292     """Sends SIGTERM to import/export daemon (if still active).
293
294     """
295     if self._daemon_name:
296       self._lu.LogWarning("Aborting %s '%s' on %s",
297                           self.MODE_TEXT, self._daemon_name, self.node_uuid)
298       result = self._lu.rpc.call_impexp_abort(self.node_uuid, self._daemon_name)
299       if result.fail_msg:
300         self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
301                             self.MODE_TEXT, self._daemon_name,
302                             self.node_uuid, result.fail_msg)
303         return False
304
305     return True
306
307   def _SetDaemonData(self, data):
308     """Internal function for updating status daemon data.
309
310     @type data: L{objects.ImportExportStatus}
311     @param data: Daemon status data
312
313     """
314     assert self._ts_begin is not None
315
316     if not data:
317       if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready):
318         raise _ImportExportError("Didn't become ready after %s seconds" %
319                                  self._timeouts.ready)
320
321       return False
322
323     self._daemon = data
324
325     return True
326
327   def SetDaemonData(self, success, data):
328     """Updates daemon status data.
329
330     @type success: bool
331     @param success: Whether fetching data was successful or not
332     @type data: L{objects.ImportExportStatus}
333     @param data: Daemon status data
334
335     """
336     if not success:
337       if self._ts_last_error is None:
338         self._ts_last_error = time.time()
339
340       elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error):
341         raise _ImportExportError("Too many errors while updating data")
342
343       return False
344
345     self._ts_last_error = None
346
347     return self._SetDaemonData(data)
348
349   def CheckListening(self):
350     """Checks whether the daemon is listening.
351
352     """
353     raise NotImplementedError()
354
355   def _GetConnectedCheckEpoch(self):
356     """Returns timeout to calculate connect timeout.
357
358     """
359     raise NotImplementedError()
360
361   def CheckConnected(self):
362     """Checks whether the daemon is connected.
363
364     @rtype: bool
365     @return: Whether the daemon is connected
366
367     """
368     assert self._daemon, "Daemon status missing"
369
370     if self._ts_connected is not None:
371       return True
372
373     if self._daemon.connected:
374       self._ts_connected = time.time()
375
376       # TODO: Log remote peer
377       logging.debug("%s '%s' on %s is now connected",
378                     self.MODE_TEXT, self._daemon_name, self.node_uuid)
379
380       self._cbs.ReportConnected(self, self._private)
381
382       return True
383
384     if utils.TimeoutExpired(self._GetConnectedCheckEpoch(),
385                             self._timeouts.connect):
386       raise _ImportExportError("Not connected after %s seconds" %
387                                self._timeouts.connect)
388
389     return False
390
391   def _CheckProgress(self):
392     """Checks whether a progress update should be reported.
393
394     """
395     if ((self._ts_last_progress is None or
396         utils.TimeoutExpired(self._ts_last_progress,
397                              self._timeouts.progress)) and
398         self._daemon and
399         self._daemon.progress_mbytes is not None and
400         self._daemon.progress_throughput is not None):
401       self._cbs.ReportProgress(self, self._private)
402       self._ts_last_progress = time.time()
403
404   def CheckFinished(self):
405     """Checks whether the daemon exited.
406
407     @rtype: bool
408     @return: Whether the transfer is finished
409
410     """
411     assert self._daemon, "Daemon status missing"
412
413     if self._ts_finished:
414       return True
415
416     if self._daemon.exit_status is None:
417       # TODO: Adjust delay for ETA expiring soon
418       self._CheckProgress()
419       return False
420
421     self._ts_finished = time.time()
422
423     self._ReportFinished(self._daemon.exit_status == 0,
424                          self._daemon.error_message)
425
426     return True
427
428   def _ReportFinished(self, success, message):
429     """Transfer is finished or daemon exited.
430
431     @type success: bool
432     @param success: Whether the transfer was successful
433     @type message: string
434     @param message: Error message
435
436     """
437     assert self.success is None
438
439     self.success = success
440     self.final_message = message
441
442     if success:
443       logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
444                    self._daemon_name, self.node_uuid)
445     elif self._daemon_name:
446       self._lu.LogWarning("%s '%s' on %s failed: %s",
447                           self.MODE_TEXT, self._daemon_name,
448                           self._lu.cfg.GetNodeName(self.node_uuid),
449                           message)
450     else:
451       self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
452                           self._lu.cfg.GetNodeName(self.node_uuid), message)
453
454     self._cbs.ReportFinished(self, self._private)
455
456   def _Finalize(self):
457     """Makes the RPC call to finalize this import/export.
458
459     """
460     return self._lu.rpc.call_impexp_cleanup(self.node_uuid, self._daemon_name)
461
462   def Finalize(self, error=None):
463     """Finalizes this import/export.
464
465     """
466     if self._daemon_name:
467       logging.info("Finalizing %s '%s' on %s",
468                    self.MODE_TEXT, self._daemon_name, self.node_uuid)
469
470       result = self._Finalize()
471       if result.fail_msg:
472         self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
473                             self.MODE_TEXT, self._daemon_name,
474                             self.node_uuid, result.fail_msg)
475         return False
476
477       # Daemon is no longer running
478       self._daemon_name = None
479       self._ts_cleanup = time.time()
480
481     if error:
482       self._ReportFinished(False, error)
483
484     return True
485
486
487 class DiskImport(_DiskImportExportBase):
488   MODE_TEXT = "import"
489
490   def __init__(self, lu, node_uuid, opts, instance, component,
491                dest, dest_args, timeouts, cbs, private=None):
492     """Initializes this class.
493
494     @param lu: Logical unit instance
495     @type node_uuid: string
496     @param node_uuid: Node name for import
497     @type opts: L{objects.ImportExportOptions}
498     @param opts: Import/export daemon options
499     @type instance: L{objects.Instance}
500     @param instance: Instance object
501     @type component: string
502     @param component: which part of the instance is being imported
503     @param dest: I/O destination
504     @param dest_args: I/O arguments
505     @type timeouts: L{ImportExportTimeouts}
506     @param timeouts: Timeouts for this import
507     @type cbs: L{ImportExportCbBase}
508     @param cbs: Callbacks
509     @param private: Private data for callback functions
510
511     """
512     _DiskImportExportBase.__init__(self, lu, node_uuid, opts, instance,
513                                    component, timeouts, cbs, private)
514     self._dest = dest
515     self._dest_args = dest_args
516
517     # Timestamps
518     self._ts_listening = None
519
520   @property
521   def listen_port(self):
522     """Returns the port the daemon is listening on.
523
524     """
525     if self._daemon:
526       return self._daemon.listen_port
527
528     return None
529
530   def _StartDaemon(self):
531     """Starts the import daemon.
532
533     """
534     return self._lu.rpc.call_import_start(self.node_uuid, self._opts,
535                                           self._instance, self._component,
536                                           (self._dest, self._dest_args))
537
538   def CheckListening(self):
539     """Checks whether the daemon is listening.
540
541     @rtype: bool
542     @return: Whether the daemon is listening
543
544     """
545     assert self._daemon, "Daemon status missing"
546
547     if self._ts_listening is not None:
548       return True
549
550     port = self._daemon.listen_port
551     if port is not None:
552       self._ts_listening = time.time()
553
554       logging.debug("Import '%s' on %s is now listening on port %s",
555                     self._daemon_name, self.node_uuid, port)
556
557       self._cbs.ReportListening(self, self._private, self._component)
558
559       return True
560
561     if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen):
562       raise _ImportExportError("Not listening after %s seconds" %
563                                self._timeouts.listen)
564
565     return False
566
567   def _GetConnectedCheckEpoch(self):
568     """Returns the time since we started listening.
569
570     """
571     assert self._ts_listening is not None, \
572            ("Checking whether an import is connected is only useful"
573             " once it's been listening")
574
575     return self._ts_listening
576
577
578 class DiskExport(_DiskImportExportBase):
579   MODE_TEXT = "export"
580
581   def __init__(self, lu, node_uuid, opts, dest_host, dest_port,
582                instance, component, source, source_args,
583                timeouts, cbs, private=None):
584     """Initializes this class.
585
586     @param lu: Logical unit instance
587     @type node_uuid: string
588     @param node_uuid: Node UUID for import
589     @type opts: L{objects.ImportExportOptions}
590     @param opts: Import/export daemon options
591     @type dest_host: string
592     @param dest_host: Destination host name or IP address
593     @type dest_port: number
594     @param dest_port: Destination port number
595     @type instance: L{objects.Instance}
596     @param instance: Instance object
597     @type component: string
598     @param component: which part of the instance is being imported
599     @param source: I/O source
600     @param source_args: I/O source
601     @type timeouts: L{ImportExportTimeouts}
602     @param timeouts: Timeouts for this import
603     @type cbs: L{ImportExportCbBase}
604     @param cbs: Callbacks
605     @param private: Private data for callback functions
606
607     """
608     _DiskImportExportBase.__init__(self, lu, node_uuid, opts, instance,
609                                    component, timeouts, cbs, private)
610     self._dest_host = dest_host
611     self._dest_port = dest_port
612     self._source = source
613     self._source_args = source_args
614
615   def _StartDaemon(self):
616     """Starts the export daemon.
617
618     """
619     return self._lu.rpc.call_export_start(self.node_uuid, self._opts,
620                                           self._dest_host, self._dest_port,
621                                           self._instance, self._component,
622                                           (self._source, self._source_args))
623
624   def CheckListening(self):
625     """Checks whether the daemon is listening.
626
627     """
628     # Only an import can be listening
629     return True
630
631   def _GetConnectedCheckEpoch(self):
632     """Returns the time since the daemon started.
633
634     """
635     assert self._ts_begin is not None
636
637     return self._ts_begin
638
639
640 def FormatProgress(progress):
641   """Formats progress information for user consumption
642
643   """
644   (mbytes, throughput, percent, eta) = progress
645
646   parts = [
647     utils.FormatUnit(mbytes, "h"),
648
649     # Not using FormatUnit as it doesn't support kilobytes
650     "%0.1f MiB/s" % throughput,
651     ]
652
653   if percent is not None:
654     parts.append("%d%%" % percent)
655
656   if eta is not None:
657     parts.append("ETA %s" % utils.FormatSeconds(eta))
658
659   return utils.CommaJoin(parts)
660
661
662 class ImportExportLoop:
663   MIN_DELAY = 1.0
664   MAX_DELAY = 20.0
665
666   def __init__(self, lu):
667     """Initializes this class.
668
669     """
670     self._lu = lu
671     self._queue = []
672     self._pending_add = []
673
674   def Add(self, diskie):
675     """Adds an import/export object to the loop.
676
677     @type diskie: Subclass of L{_DiskImportExportBase}
678     @param diskie: Import/export object
679
680     """
681     assert diskie not in self._pending_add
682     assert diskie.loop is None
683
684     diskie.SetLoop(self)
685
686     # Adding new objects to a staging list is necessary, otherwise the main
687     # loop gets confused if callbacks modify the queue while the main loop is
688     # iterating over it.
689     self._pending_add.append(diskie)
690
691   @staticmethod
692   def _CollectDaemonStatus(lu, daemons):
693     """Collects the status for all import/export daemons.
694
695     """
696     daemon_status = {}
697
698     for node_name, names in daemons.iteritems():
699       result = lu.rpc.call_impexp_status(node_name, names)
700       if result.fail_msg:
701         lu.LogWarning("Failed to get daemon status on %s: %s",
702                       node_name, result.fail_msg)
703         continue
704
705       assert len(names) == len(result.payload)
706
707       daemon_status[node_name] = dict(zip(names, result.payload))
708
709     return daemon_status
710
711   @staticmethod
712   def _GetActiveDaemonNames(queue):
713     """Gets the names of all active daemons.
714
715     """
716     result = {}
717     for diskie in queue:
718       if not diskie.active:
719         continue
720
721       try:
722         # Start daemon if necessary
723         daemon_name = diskie.CheckDaemon()
724       except _ImportExportError, err:
725         logging.exception("%s failed", diskie.MODE_TEXT)
726         diskie.Finalize(error=str(err))
727         continue
728
729       result.setdefault(diskie.node_name, []).append(daemon_name)
730
731     assert len(queue) >= len(result)
732     assert len(queue) >= sum([len(names) for names in result.itervalues()])
733
734     logging.debug("daemons=%r", result)
735
736     return result
737
738   def _AddPendingToQueue(self):
739     """Adds all pending import/export objects to the internal queue.
740
741     """
742     assert compat.all(diskie not in self._queue and diskie.loop == self
743                       for diskie in self._pending_add)
744
745     self._queue.extend(self._pending_add)
746
747     del self._pending_add[:]
748
749   def Run(self):
750     """Utility main loop.
751
752     """
753     while True:
754       self._AddPendingToQueue()
755
756       # Collect all active daemon names
757       daemons = self._GetActiveDaemonNames(self._queue)
758       if not daemons:
759         break
760
761       # Collection daemon status data
762       data = self._CollectDaemonStatus(self._lu, daemons)
763
764       # Use data
765       delay = self.MAX_DELAY
766       for diskie in self._queue:
767         if not diskie.active:
768           continue
769
770         try:
771           try:
772             all_daemon_data = data[diskie.node_name]
773           except KeyError:
774             result = diskie.SetDaemonData(False, None)
775           else:
776             result = \
777               diskie.SetDaemonData(True,
778                                    all_daemon_data[diskie.GetDaemonName()])
779
780           if not result:
781             # Daemon not yet ready, retry soon
782             delay = min(3.0, delay)
783             continue
784
785           if diskie.CheckFinished():
786             # Transfer finished
787             diskie.Finalize()
788             continue
789
790           # Normal case: check again in 5 seconds
791           delay = min(5.0, delay)
792
793           if not diskie.CheckListening():
794             # Not yet listening, retry soon
795             delay = min(1.0, delay)
796             continue
797
798           if not diskie.CheckConnected():
799             # Not yet connected, retry soon
800             delay = min(1.0, delay)
801             continue
802
803         except _ImportExportError, err:
804           logging.exception("%s failed", diskie.MODE_TEXT)
805           diskie.Finalize(error=str(err))
806
807       if not compat.any(diskie.active for diskie in self._queue):
808         break
809
810       # Wait a bit
811       delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
812       logging.debug("Waiting for %ss", delay)
813       time.sleep(delay)
814
815   def FinalizeAll(self):
816     """Finalizes all pending transfers.
817
818     """
819     success = True
820
821     for diskie in self._queue:
822       success = diskie.Finalize() and success
823
824     return success
825
826
827 class _TransferInstCbBase(ImportExportCbBase):
828   def __init__(self, lu, feedback_fn, instance, timeouts, src_node_uuid,
829                src_cbs, dest_node_uuid, dest_ip):
830     """Initializes this class.
831
832     """
833     ImportExportCbBase.__init__(self)
834
835     self.lu = lu
836     self.feedback_fn = feedback_fn
837     self.instance = instance
838     self.timeouts = timeouts
839     self.src_node_uuid = src_node_uuid
840     self.src_cbs = src_cbs
841     self.dest_node_uuid = dest_node_uuid
842     self.dest_ip = dest_ip
843
844
845 class _TransferInstSourceCb(_TransferInstCbBase):
846   def ReportConnected(self, ie, dtp):
847     """Called when a connection has been established.
848
849     """
850     assert self.src_cbs is None
851     assert dtp.src_export == ie
852     assert dtp.dest_import
853
854     self.feedback_fn("%s is sending data on %s" %
855                      (dtp.data.name, ie.node_name))
856
857   def ReportProgress(self, ie, dtp):
858     """Called when new progress information should be reported.
859
860     """
861     progress = ie.progress
862     if not progress:
863       return
864
865     self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
866
867   def ReportFinished(self, ie, dtp):
868     """Called when a transfer has finished.
869
870     """
871     assert self.src_cbs is None
872     assert dtp.src_export == ie
873     assert dtp.dest_import
874
875     if ie.success:
876       self.feedback_fn("%s finished sending data" % dtp.data.name)
877     else:
878       self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
879                        (dtp.data.name, ie.final_message, ie.recent_output))
880
881     dtp.RecordResult(ie.success)
882
883     cb = dtp.data.finished_fn
884     if cb:
885       cb()
886
887     # TODO: Check whether sending SIGTERM right away is okay, maybe we should
888     # give the daemon a moment to sort things out
889     if dtp.dest_import and not ie.success:
890       dtp.dest_import.Abort()
891
892
893 class _TransferInstDestCb(_TransferInstCbBase):
894   def ReportListening(self, ie, dtp, component):
895     """Called when daemon started listening.
896
897     """
898     assert self.src_cbs
899     assert dtp.src_export is None
900     assert dtp.dest_import
901     assert dtp.export_opts
902
903     self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
904
905     # Start export on source node
906     de = DiskExport(self.lu, self.src_node_uuid, dtp.export_opts,
907                     self.dest_ip, ie.listen_port, self.instance,
908                     component, dtp.data.src_io, dtp.data.src_ioargs,
909                     self.timeouts, self.src_cbs, private=dtp)
910     ie.loop.Add(de)
911
912     dtp.src_export = de
913
914   def ReportConnected(self, ie, dtp):
915     """Called when a connection has been established.
916
917     """
918     self.feedback_fn("%s is receiving data on %s" %
919                      (dtp.data.name,
920                       self.lu.cfg.GetNodeName(self.dest_node_uuid)))
921
922   def ReportFinished(self, ie, dtp):
923     """Called when a transfer has finished.
924
925     """
926     if ie.success:
927       self.feedback_fn("%s finished receiving data" % dtp.data.name)
928     else:
929       self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
930                        (dtp.data.name, ie.final_message, ie.recent_output))
931
932     dtp.RecordResult(ie.success)
933
934     # TODO: Check whether sending SIGTERM right away is okay, maybe we should
935     # give the daemon a moment to sort things out
936     if dtp.src_export and not ie.success:
937       dtp.src_export.Abort()
938
939
940 class DiskTransfer(object):
941   def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
942                finished_fn):
943     """Initializes this class.
944
945     @type name: string
946     @param name: User-visible name for this transfer (e.g. "disk/0")
947     @param src_io: Source I/O type
948     @param src_ioargs: Source I/O arguments
949     @param dest_io: Destination I/O type
950     @param dest_ioargs: Destination I/O arguments
951     @type finished_fn: callable
952     @param finished_fn: Function called once transfer has finished
953
954     """
955     self.name = name
956
957     self.src_io = src_io
958     self.src_ioargs = src_ioargs
959
960     self.dest_io = dest_io
961     self.dest_ioargs = dest_ioargs
962
963     self.finished_fn = finished_fn
964
965
966 class _DiskTransferPrivate(object):
967   def __init__(self, data, success, export_opts):
968     """Initializes this class.
969
970     @type data: L{DiskTransfer}
971     @type success: bool
972
973     """
974     self.data = data
975     self.success = success
976     self.export_opts = export_opts
977
978     self.src_export = None
979     self.dest_import = None
980
981   def RecordResult(self, success):
982     """Updates the status.
983
984     One failed part will cause the whole transfer to fail.
985
986     """
987     self.success = self.success and success
988
989
990 def _GetInstDiskMagic(base, instance_name, index):
991   """Computes the magic value for a disk export or import.
992
993   @type base: string
994   @param base: Random seed value (can be the same for all disks of a transfer)
995   @type instance_name: string
996   @param instance_name: Name of instance
997   @type index: number
998   @param index: Disk index
999
1000   """
1001   h = compat.sha1_hash()
1002   h.update(str(constants.RIE_VERSION))
1003   h.update(base)
1004   h.update(instance_name)
1005   h.update(str(index))
1006   return h.hexdigest()
1007
1008
1009 def TransferInstanceData(lu, feedback_fn, src_node_uuid, dest_node_uuid,
1010                          dest_ip, instance, all_transfers):
1011   """Transfers an instance's data from one node to another.
1012
1013   @param lu: Logical unit instance
1014   @param feedback_fn: Feedback function
1015   @type src_node_uuid: string
1016   @param src_node_uuid: Source node UUID
1017   @type dest_node_uuid: string
1018   @param dest_node_uuid: Destination node UUID
1019   @type dest_ip: string
1020   @param dest_ip: IP address of destination node
1021   @type instance: L{objects.Instance}
1022   @param instance: Instance object
1023   @type all_transfers: list of L{DiskTransfer} instances
1024   @param all_transfers: List of all disk transfers to be made
1025   @rtype: list
1026   @return: List with a boolean (True=successful, False=failed) for success for
1027            each transfer
1028
1029   """
1030   # Disable compression for all moves as these are all within the same cluster
1031   compress = constants.IEC_NONE
1032
1033   src_node_name = lu.cfg.GetNodeName(src_node_uuid)
1034   dest_node_name = lu.cfg.GetNodeName(dest_node_uuid)
1035
1036   logging.debug("Source node %s, destination node %s, compression '%s'",
1037                 src_node_name, dest_node_name, compress)
1038
1039   timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1040   src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1041                                   src_node_uuid, None, dest_node_uuid, dest_ip)
1042   dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1043                                  src_node_uuid, src_cbs, dest_node_uuid,
1044                                  dest_ip)
1045
1046   all_dtp = []
1047
1048   base_magic = utils.GenerateSecret(6)
1049
1050   ieloop = ImportExportLoop(lu)
1051   try:
1052     for idx, transfer in enumerate(all_transfers):
1053       if transfer:
1054         feedback_fn("Exporting %s from %s to %s" %
1055                     (transfer.name, src_node_name, dest_node_name))
1056
1057         magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1058         opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1059                                            compress=compress, magic=magic)
1060
1061         dtp = _DiskTransferPrivate(transfer, True, opts)
1062
1063         di = DiskImport(lu, dest_node_uuid, opts, instance, "disk%d" % idx,
1064                         transfer.dest_io, transfer.dest_ioargs,
1065                         timeouts, dest_cbs, private=dtp)
1066         ieloop.Add(di)
1067
1068         dtp.dest_import = di
1069       else:
1070         dtp = _DiskTransferPrivate(None, False, None)
1071
1072       all_dtp.append(dtp)
1073
1074     ieloop.Run()
1075   finally:
1076     ieloop.FinalizeAll()
1077
1078   assert len(all_dtp) == len(all_transfers)
1079   assert compat.all((dtp.src_export is None or
1080                       dtp.src_export.success is not None) and
1081                      (dtp.dest_import is None or
1082                       dtp.dest_import.success is not None)
1083                      for dtp in all_dtp), \
1084          "Not all imports/exports are finalized"
1085
1086   return [bool(dtp.success) for dtp in all_dtp]
1087
1088
1089 class _RemoteExportCb(ImportExportCbBase):
1090   def __init__(self, feedback_fn, disk_count):
1091     """Initializes this class.
1092
1093     """
1094     ImportExportCbBase.__init__(self)
1095     self._feedback_fn = feedback_fn
1096     self._dresults = [None] * disk_count
1097
1098   @property
1099   def disk_results(self):
1100     """Returns per-disk results.
1101
1102     """
1103     return self._dresults
1104
1105   def ReportConnected(self, ie, private):
1106     """Called when a connection has been established.
1107
1108     """
1109     (idx, _) = private
1110
1111     self._feedback_fn("Disk %s is now sending data" % idx)
1112
1113   def ReportProgress(self, ie, private):
1114     """Called when new progress information should be reported.
1115
1116     """
1117     (idx, _) = private
1118
1119     progress = ie.progress
1120     if not progress:
1121       return
1122
1123     self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1124
1125   def ReportFinished(self, ie, private):
1126     """Called when a transfer has finished.
1127
1128     """
1129     (idx, finished_fn) = private
1130
1131     if ie.success:
1132       self._feedback_fn("Disk %s finished sending data" % idx)
1133     else:
1134       self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1135                         (idx, ie.final_message, ie.recent_output))
1136
1137     self._dresults[idx] = bool(ie.success)
1138
1139     if finished_fn:
1140       finished_fn()
1141
1142
1143 class ExportInstanceHelper:
1144   def __init__(self, lu, feedback_fn, instance):
1145     """Initializes this class.
1146
1147     @param lu: Logical unit instance
1148     @param feedback_fn: Feedback function
1149     @type instance: L{objects.Instance}
1150     @param instance: Instance object
1151
1152     """
1153     self._lu = lu
1154     self._feedback_fn = feedback_fn
1155     self._instance = instance
1156
1157     self._snap_disks = []
1158     self._removed_snaps = [False] * len(instance.disks)
1159
1160   def CreateSnapshots(self):
1161     """Creates an LVM snapshot for every disk of the instance.
1162
1163     """
1164     assert not self._snap_disks
1165
1166     instance = self._instance
1167     src_node = instance.primary_node
1168
1169     for idx, disk in enumerate(instance.disks):
1170       self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1171                         (idx, src_node))
1172
1173       # result.payload will be a snapshot of an lvm leaf of the one we
1174       # passed
1175       result = self._lu.rpc.call_blockdev_snapshot(src_node, (disk, instance))
1176       new_dev = False
1177       msg = result.fail_msg
1178       if msg:
1179         self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1180                             idx, src_node, msg)
1181       elif (not isinstance(result.payload, (tuple, list)) or
1182             len(result.payload) != 2):
1183         self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1184                             " result '%s'", idx, src_node, result.payload)
1185       else:
1186         disk_id = tuple(result.payload)
1187         disk_params = constants.DISK_DT_DEFAULTS[constants.DT_PLAIN].copy()
1188         new_dev = objects.Disk(dev_type=constants.DT_PLAIN, size=disk.size,
1189                                logical_id=disk_id, physical_id=disk_id,
1190                                iv_name=disk.iv_name,
1191                                params=disk_params)
1192
1193       self._snap_disks.append(new_dev)
1194
1195     assert len(self._snap_disks) == len(instance.disks)
1196     assert len(self._removed_snaps) == len(instance.disks)
1197
1198   def _RemoveSnapshot(self, disk_index):
1199     """Removes an LVM snapshot.
1200
1201     @type disk_index: number
1202     @param disk_index: Index of the snapshot to be removed
1203
1204     """
1205     disk = self._snap_disks[disk_index]
1206     if disk and not self._removed_snaps[disk_index]:
1207       src_node = self._instance.primary_node
1208
1209       self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1210                         (disk_index, src_node))
1211
1212       result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1213       if result.fail_msg:
1214         self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1215                             " %s: %s", disk_index, src_node, result.fail_msg)
1216       else:
1217         self._removed_snaps[disk_index] = True
1218
1219   def LocalExport(self, dest_node):
1220     """Intra-cluster instance export.
1221
1222     @type dest_node: L{objects.Node}
1223     @param dest_node: Destination node
1224
1225     """
1226     instance = self._instance
1227     src_node_uuid = instance.primary_node
1228
1229     assert len(self._snap_disks) == len(instance.disks)
1230
1231     transfers = []
1232
1233     for idx, dev in enumerate(self._snap_disks):
1234       if not dev:
1235         transfers.append(None)
1236         continue
1237
1238       path = utils.PathJoin(pathutils.EXPORT_DIR, "%s.new" % instance.name,
1239                             dev.physical_id[1])
1240
1241       finished_fn = compat.partial(self._TransferFinished, idx)
1242
1243       # FIXME: pass debug option from opcode to backend
1244       dt = DiskTransfer("snapshot/%s" % idx,
1245                         constants.IEIO_SCRIPT, (dev, idx),
1246                         constants.IEIO_FILE, (path, ),
1247                         finished_fn)
1248       transfers.append(dt)
1249
1250     # Actually export data
1251     dresults = TransferInstanceData(self._lu, self._feedback_fn,
1252                                     src_node_uuid, dest_node.uuid,
1253                                     dest_node.secondary_ip,
1254                                     instance, transfers)
1255
1256     assert len(dresults) == len(instance.disks)
1257
1258     self._feedback_fn("Finalizing export on %s" % dest_node.name)
1259     result = self._lu.rpc.call_finalize_export(dest_node.uuid, instance,
1260                                                self._snap_disks)
1261     msg = result.fail_msg
1262     fin_resu = not msg
1263     if msg:
1264       self._lu.LogWarning("Could not finalize export for instance %s"
1265                           " on node %s: %s", instance.name, dest_node.name, msg)
1266
1267     return (fin_resu, dresults)
1268
1269   def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1270     """Inter-cluster instance export.
1271
1272     @type disk_info: list
1273     @param disk_info: Per-disk destination information
1274     @type key_name: string
1275     @param key_name: Name of X509 key to use
1276     @type dest_ca_pem: string
1277     @param dest_ca_pem: Destination X509 CA in PEM format
1278     @type timeouts: L{ImportExportTimeouts}
1279     @param timeouts: Timeouts for this import
1280
1281     """
1282     instance = self._instance
1283
1284     assert len(disk_info) == len(instance.disks)
1285
1286     cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1287
1288     ieloop = ImportExportLoop(self._lu)
1289     try:
1290       for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1291                                                            disk_info)):
1292         # Decide whether to use IPv6
1293         ipv6 = netutils.IP6Address.IsValid(host)
1294
1295         opts = objects.ImportExportOptions(key_name=key_name,
1296                                            ca_pem=dest_ca_pem,
1297                                            magic=magic, ipv6=ipv6)
1298
1299         self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1300         finished_fn = compat.partial(self._TransferFinished, idx)
1301         ieloop.Add(DiskExport(self._lu, instance.primary_node,
1302                               opts, host, port, instance, "disk%d" % idx,
1303                               constants.IEIO_SCRIPT, (dev, idx),
1304                               timeouts, cbs, private=(idx, finished_fn)))
1305
1306       ieloop.Run()
1307     finally:
1308       ieloop.FinalizeAll()
1309
1310     return (True, cbs.disk_results)
1311
1312   def _TransferFinished(self, idx):
1313     """Called once a transfer has finished.
1314
1315     @type idx: number
1316     @param idx: Disk index
1317
1318     """
1319     logging.debug("Transfer %s finished", idx)
1320     self._RemoveSnapshot(idx)
1321
1322   def Cleanup(self):
1323     """Remove all snapshots.
1324
1325     """
1326     assert len(self._removed_snaps) == len(self._instance.disks)
1327     for idx in range(len(self._instance.disks)):
1328       self._RemoveSnapshot(idx)
1329
1330
1331 class _RemoteImportCb(ImportExportCbBase):
1332   def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1333                external_address):
1334     """Initializes this class.
1335
1336     @type cds: string
1337     @param cds: Cluster domain secret
1338     @type x509_cert_pem: string
1339     @param x509_cert_pem: CA used for signing import key
1340     @type disk_count: number
1341     @param disk_count: Number of disks
1342     @type external_address: string
1343     @param external_address: External address of destination node
1344
1345     """
1346     ImportExportCbBase.__init__(self)
1347     self._feedback_fn = feedback_fn
1348     self._cds = cds
1349     self._x509_cert_pem = x509_cert_pem
1350     self._disk_count = disk_count
1351     self._external_address = external_address
1352
1353     self._dresults = [None] * disk_count
1354     self._daemon_port = [None] * disk_count
1355
1356     self._salt = utils.GenerateSecret(8)
1357
1358   @property
1359   def disk_results(self):
1360     """Returns per-disk results.
1361
1362     """
1363     return self._dresults
1364
1365   def _CheckAllListening(self):
1366     """Checks whether all daemons are listening.
1367
1368     If all daemons are listening, the information is sent to the client.
1369
1370     """
1371     if not compat.all(dp is not None for dp in self._daemon_port):
1372       return
1373
1374     host = self._external_address
1375
1376     disks = []
1377     for idx, (port, magic) in enumerate(self._daemon_port):
1378       disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1379                                                idx, host, port, magic))
1380
1381     assert len(disks) == self._disk_count
1382
1383     self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1384       "disks": disks,
1385       "x509_ca": self._x509_cert_pem,
1386       })
1387
1388   def ReportListening(self, ie, private, _):
1389     """Called when daemon started listening.
1390
1391     """
1392     (idx, ) = private
1393
1394     self._feedback_fn("Disk %s is now listening" % idx)
1395
1396     assert self._daemon_port[idx] is None
1397
1398     self._daemon_port[idx] = (ie.listen_port, ie.magic)
1399
1400     self._CheckAllListening()
1401
1402   def ReportConnected(self, ie, private):
1403     """Called when a connection has been established.
1404
1405     """
1406     (idx, ) = private
1407
1408     self._feedback_fn("Disk %s is now receiving data" % idx)
1409
1410   def ReportFinished(self, ie, private):
1411     """Called when a transfer has finished.
1412
1413     """
1414     (idx, ) = private
1415
1416     # Daemon is certainly no longer listening
1417     self._daemon_port[idx] = None
1418
1419     if ie.success:
1420       self._feedback_fn("Disk %s finished receiving data" % idx)
1421     else:
1422       self._feedback_fn(("Disk %s failed to receive data: %s"
1423                          " (recent output: %s)") %
1424                         (idx, ie.final_message, ie.recent_output))
1425
1426     self._dresults[idx] = bool(ie.success)
1427
1428
1429 def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1430                  cds, timeouts):
1431   """Imports an instance from another cluster.
1432
1433   @param lu: Logical unit instance
1434   @param feedback_fn: Feedback function
1435   @type instance: L{objects.Instance}
1436   @param instance: Instance object
1437   @type pnode: L{objects.Node}
1438   @param pnode: Primary node of instance as an object
1439   @type source_x509_ca: OpenSSL.crypto.X509
1440   @param source_x509_ca: Import source's X509 CA
1441   @type cds: string
1442   @param cds: Cluster domain secret
1443   @type timeouts: L{ImportExportTimeouts}
1444   @param timeouts: Timeouts for this import
1445
1446   """
1447   source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1448                                                   source_x509_ca)
1449
1450   magic_base = utils.GenerateSecret(6)
1451
1452   # Decide whether to use IPv6
1453   ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1454
1455   # Create crypto key
1456   result = lu.rpc.call_x509_cert_create(instance.primary_node,
1457                                         constants.RIE_CERT_VALIDITY)
1458   result.Raise("Can't create X509 key and certificate on %s" % result.node)
1459
1460   (x509_key_name, x509_cert_pem) = result.payload
1461   try:
1462     # Load certificate
1463     x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1464                                                 x509_cert_pem)
1465
1466     # Sign certificate
1467     signed_x509_cert_pem = \
1468       utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1469
1470     cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1471                           len(instance.disks), pnode.primary_ip)
1472
1473     ieloop = ImportExportLoop(lu)
1474     try:
1475       for idx, dev in enumerate(instance.disks):
1476         magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1477
1478         # Import daemon options
1479         opts = objects.ImportExportOptions(key_name=x509_key_name,
1480                                            ca_pem=source_ca_pem,
1481                                            magic=magic, ipv6=ipv6)
1482
1483         ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1484                               "disk%d" % idx,
1485                               constants.IEIO_SCRIPT, (dev, idx),
1486                               timeouts, cbs, private=(idx, )))
1487
1488       ieloop.Run()
1489     finally:
1490       ieloop.FinalizeAll()
1491   finally:
1492     # Remove crypto key and certificate
1493     result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1494     result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1495
1496   return cbs.disk_results
1497
1498
1499 def _GetImportExportHandshakeMessage(version):
1500   """Returns the handshake message for a RIE protocol version.
1501
1502   @type version: number
1503
1504   """
1505   return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1506
1507
1508 def ComputeRemoteExportHandshake(cds):
1509   """Computes the remote import/export handshake.
1510
1511   @type cds: string
1512   @param cds: Cluster domain secret
1513
1514   """
1515   salt = utils.GenerateSecret(8)
1516   msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1517   return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1518
1519
1520 def CheckRemoteExportHandshake(cds, handshake):
1521   """Checks the handshake of a remote import/export.
1522
1523   @type cds: string
1524   @param cds: Cluster domain secret
1525   @type handshake: sequence
1526   @param handshake: Handshake sent by remote peer
1527
1528   """
1529   try:
1530     (version, hmac_digest, hmac_salt) = handshake
1531   except (TypeError, ValueError), err:
1532     return "Invalid data: %s" % err
1533
1534   if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1535                               hmac_digest, salt=hmac_salt):
1536     return "Hash didn't match, clusters don't share the same domain secret"
1537
1538   if version != constants.RIE_VERSION:
1539     return ("Clusters don't have the same remote import/export protocol"
1540             " (local=%s, remote=%s)" %
1541             (constants.RIE_VERSION, version))
1542
1543   return None
1544
1545
1546 def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1547   """Returns the hashed text for import/export disk information.
1548
1549   @type disk_index: number
1550   @param disk_index: Index of disk (included in hash)
1551   @type host: string
1552   @param host: Hostname
1553   @type port: number
1554   @param port: Daemon port
1555   @type magic: string
1556   @param magic: Magic value
1557
1558   """
1559   return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1560
1561
1562 def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1563   """Verifies received disk information for an export.
1564
1565   @type cds: string
1566   @param cds: Cluster domain secret
1567   @type disk_index: number
1568   @param disk_index: Index of disk (included in hash)
1569   @type disk_info: sequence
1570   @param disk_info: Disk information sent by remote peer
1571
1572   """
1573   try:
1574     (host, port, magic, hmac_digest, hmac_salt) = disk_info
1575   except (TypeError, ValueError), err:
1576     raise errors.GenericError("Invalid data: %s" % err)
1577
1578   if not (host and port and magic):
1579     raise errors.GenericError("Missing destination host, port or magic")
1580
1581   msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1582
1583   if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1584     raise errors.GenericError("HMAC is wrong")
1585
1586   if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1587     destination = host
1588   else:
1589     destination = netutils.Hostname.GetNormalizedName(host)
1590
1591   return (destination,
1592           utils.ValidateServiceName(port),
1593           magic)
1594
1595
1596 def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1597   """Computes the signed disk information for a remote import.
1598
1599   @type cds: string
1600   @param cds: Cluster domain secret
1601   @type salt: string
1602   @param salt: HMAC salt
1603   @type disk_index: number
1604   @param disk_index: Index of disk (included in hash)
1605   @type host: string
1606   @param host: Hostname
1607   @type port: number
1608   @param port: Daemon port
1609   @type magic: string
1610   @param magic: Magic value
1611
1612   """
1613   msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1614   hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1615   return (host, port, magic, hmac_digest, salt)
1616
1617
1618 def CalculateGroupIPolicy(cluster, group):
1619   """Calculate instance policy for group.
1620
1621   """
1622   return cluster.SimpleFillIPolicy(group.ipolicy)
1623
1624
1625 def ComputeDiskSize(disk_template, disks):
1626   """Compute disk size requirements according to disk template
1627
1628   """
1629   # Required free disk space as a function of disk and swap space
1630   req_size_dict = {
1631     constants.DT_DISKLESS: 0,
1632     constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
1633     # 128 MB are added for drbd metadata for each disk
1634     constants.DT_DRBD8:
1635       sum(d[constants.IDISK_SIZE] + constants.DRBD_META_SIZE for d in disks),
1636     constants.DT_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
1637     constants.DT_SHARED_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
1638     constants.DT_BLOCK: 0,
1639     constants.DT_RBD: sum(d[constants.IDISK_SIZE] for d in disks),
1640     constants.DT_EXT: sum(d[constants.IDISK_SIZE] for d in disks),
1641   }
1642
1643   if disk_template not in req_size_dict:
1644     raise errors.ProgrammerError("Disk template '%s' size requirement"
1645                                  " is unknown" % disk_template)
1646
1647   return req_size_dict[disk_template]