Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ acd65a16

History | View | Annotate | Download (41.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Instance-related functions and classes for masterd.
23

24
"""
25

    
26
import logging
27
import time
28
import OpenSSL
29

    
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import compat
33
from ganeti import utils
34
from ganeti import objects
35

    
36

    
37
class _ImportExportError(Exception):
38
  """Local exception to report import/export errors.
39

40
  """
41

    
42

    
43
class ImportExportTimeouts(object):
44
  #: Time until daemon starts writing status file
45
  DEFAULT_READY_TIMEOUT = 10
46

    
47
  #: Length of time until errors cause hard failure
48
  DEFAULT_ERROR_TIMEOUT = 10
49

    
50
  #: Time after which daemon must be listening
51
  DEFAULT_LISTEN_TIMEOUT = 10
52

    
53
  #: Progress update interval
54
  DEFAULT_PROGRESS_INTERVAL = 60
55

    
56
  __slots__ = [
57
    "error",
58
    "ready",
59
    "listen",
60
    "connect",
61
    "progress",
62
    ]
63

    
64
  def __init__(self, connect,
65
               listen=DEFAULT_LISTEN_TIMEOUT,
66
               error=DEFAULT_ERROR_TIMEOUT,
67
               ready=DEFAULT_READY_TIMEOUT,
68
               progress=DEFAULT_PROGRESS_INTERVAL):
69
    """Initializes this class.
70

71
    @type connect: number
72
    @param connect: Timeout for establishing connection
73
    @type listen: number
74
    @param listen: Timeout for starting to listen for connections
75
    @type error: number
76
    @param error: Length of time until errors cause hard failure
77
    @type ready: number
78
    @param ready: Timeout for daemon to become ready
79
    @type progress: number
80
    @param progress: Progress update interval
81

82
    """
83
    self.error = error
84
    self.ready = ready
85
    self.listen = listen
86
    self.connect = connect
87
    self.progress = progress
88

    
89

    
90
class ImportExportCbBase(object):
91
  """Callbacks for disk import/export.
92

93
  """
94
  def ReportListening(self, ie, private):
95
    """Called when daemon started listening.
96

97
    @type ie: Subclass of L{_DiskImportExportBase}
98
    @param ie: Import/export object
99
    @param private: Private data passed to import/export object
100

101
    """
102

    
103
  def ReportConnected(self, ie, private):
104
    """Called when a connection has been established.
105

106
    @type ie: Subclass of L{_DiskImportExportBase}
107
    @param ie: Import/export object
108
    @param private: Private data passed to import/export object
109

110
    """
111

    
112
  def ReportProgress(self, ie, private):
113
    """Called when new progress information should be reported.
114

115
    @type ie: Subclass of L{_DiskImportExportBase}
116
    @param ie: Import/export object
117
    @param private: Private data passed to import/export object
118

119
    """
120

    
121
  def ReportFinished(self, ie, private):
122
    """Called when a transfer has finished.
123

124
    @type ie: Subclass of L{_DiskImportExportBase}
125
    @param ie: Import/export object
126
    @param private: Private data passed to import/export object
127

128
    """
129

    
130

    
131
def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
132
  """Checks whether a timeout has expired.
133

134
  """
135
  return _time_fn() > (epoch + timeout)
136

    
137

    
138
class _DiskImportExportBase(object):
139
  MODE_TEXT = None
140

    
141
  def __init__(self, lu, node_name, opts,
142
               instance, timeouts, cbs, private=None):
143
    """Initializes this class.
144

145
    @param lu: Logical unit instance
146
    @type node_name: string
147
    @param node_name: Node name for import
148
    @type opts: L{objects.ImportExportOptions}
149
    @param opts: Import/export daemon options
150
    @type instance: L{objects.Instance}
151
    @param instance: Instance object
152
    @type timeouts: L{ImportExportTimeouts}
153
    @param timeouts: Timeouts for this import
154
    @type cbs: L{ImportExportCbBase}
155
    @param cbs: Callbacks
156
    @param private: Private data for callback functions
157

158
    """
159
    assert self.MODE_TEXT
160

    
161
    self._lu = lu
162
    self.node_name = node_name
163
    self._opts = opts
164
    self._instance = instance
165
    self._timeouts = timeouts
166
    self._cbs = cbs
167
    self._private = private
168

    
169
    # Parent loop
170
    self._loop = None
171

    
172
    # Timestamps
173
    self._ts_begin = None
174
    self._ts_connected = None
175
    self._ts_finished = None
176
    self._ts_cleanup = None
177
    self._ts_last_progress = None
178
    self._ts_last_error = None
179

    
180
    # Transfer status
181
    self.success = None
182
    self.final_message = None
183

    
184
    # Daemon status
185
    self._daemon_name = None
186
    self._daemon = None
187

    
188
  @property
189
  def recent_output(self):
190
    """Returns the most recent output from the daemon.
191

192
    """
193
    if self._daemon:
194
      return self._daemon.recent_output
195

    
196
    return None
197

    
198
  @property
199
  def progress(self):
200
    """Returns transfer progress information.
201

202
    """
203
    if not self._daemon:
204
      return None
205

    
206
    return (self._daemon.progress_mbytes,
207
            self._daemon.progress_throughput,
208
            self._daemon.progress_percent,
209
            self._daemon.progress_eta)
210

    
211
  @property
212
  def active(self):
213
    """Determines whether this transport is still active.
214

215
    """
216
    return self.success is None
217

    
218
  @property
219
  def loop(self):
220
    """Returns parent loop.
221

222
    @rtype: L{ImportExportLoop}
223

224
    """
225
    return self._loop
226

    
227
  def SetLoop(self, loop):
228
    """Sets the parent loop.
229

230
    @type loop: L{ImportExportLoop}
231

232
    """
233
    if self._loop:
234
      raise errors.ProgrammerError("Loop can only be set once")
235

    
236
    self._loop = loop
237

    
238
  def _StartDaemon(self):
239
    """Starts the import/export daemon.
240

241
    """
242
    raise NotImplementedError()
243

    
244
  def CheckDaemon(self):
245
    """Checks whether daemon has been started and if not, starts it.
246

247
    @rtype: string
248
    @return: Daemon name
249

250
    """
251
    assert self._ts_cleanup is None
252

    
253
    if self._daemon_name is None:
254
      assert self._ts_begin is None
255

    
256
      result = self._StartDaemon()
257
      if result.fail_msg:
258
        raise _ImportExportError("Failed to start %s on %s: %s" %
259
                                 (self.MODE_TEXT, self.node_name,
260
                                  result.fail_msg))
261

    
262
      daemon_name = result.payload
263

    
264
      logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
265
                   self.node_name)
266

    
267
      self._ts_begin = time.time()
268
      self._daemon_name = daemon_name
269

    
270
    return self._daemon_name
271

    
272
  def GetDaemonName(self):
273
    """Returns the daemon name.
274

275
    """
276
    assert self._daemon_name, "Daemon has not been started"
277
    assert self._ts_cleanup is None
278
    return self._daemon_name
279

    
280
  def Abort(self):
281
    """Sends SIGTERM to import/export daemon (if still active).
282

283
    """
284
    if self._daemon_name:
285
      self._lu.LogWarning("Aborting %s %r on %s",
286
                          self.MODE_TEXT, self._daemon_name, self.node_name)
287
      result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
288
      if result.fail_msg:
289
        self._lu.LogWarning("Failed to abort %s %r on %s: %s",
290
                            self.MODE_TEXT, self._daemon_name,
291
                            self.node_name, result.fail_msg)
292
        return False
293

    
294
    return True
295

    
296
  def _SetDaemonData(self, data):
297
    """Internal function for updating status daemon data.
298

299
    @type data: L{objects.ImportExportStatus}
300
    @param data: Daemon status data
301

302
    """
303
    assert self._ts_begin is not None
304

    
305
    if not data:
306
      if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
307
        raise _ImportExportError("Didn't become ready after %s seconds" %
308
                                 self._timeouts.ready)
309

    
310
      return False
311

    
312
    self._daemon = data
313

    
314
    return True
315

    
316
  def SetDaemonData(self, success, data):
317
    """Updates daemon status data.
318

319
    @type success: bool
320
    @param success: Whether fetching data was successful or not
321
    @type data: L{objects.ImportExportStatus}
322
    @param data: Daemon status data
323

324
    """
325
    if not success:
326
      if self._ts_last_error is None:
327
        self._ts_last_error = time.time()
328

    
329
      elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
330
        raise _ImportExportError("Too many errors while updating data")
331

    
332
      return False
333

    
334
    self._ts_last_error = None
335

    
336
    return self._SetDaemonData(data)
337

    
338
  def CheckListening(self):
339
    """Checks whether the daemon is listening.
340

341
    """
342
    raise NotImplementedError()
343

    
344
  def _GetConnectedCheckEpoch(self):
345
    """Returns timeout to calculate connect timeout.
346

347
    """
348
    raise NotImplementedError()
349

    
350
  def CheckConnected(self):
351
    """Checks whether the daemon is connected.
352

353
    @rtype: bool
354
    @return: Whether the daemon is connected
355

356
    """
357
    assert self._daemon, "Daemon status missing"
358

    
359
    if self._ts_connected is not None:
360
      return True
361

    
362
    if self._daemon.connected:
363
      self._ts_connected = time.time()
364

    
365
      # TODO: Log remote peer
366
      logging.debug("%s %r on %s is now connected",
367
                    self.MODE_TEXT, self._daemon_name, self.node_name)
368

    
369
      self._cbs.ReportConnected(self, self._private)
370

    
371
      return True
372

    
373
    if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
374
      raise _ImportExportError("Not connected after %s seconds" %
375
                               self._timeouts.connect)
376

    
377
    return False
378

    
379
  def _CheckProgress(self):
380
    """Checks whether a progress update should be reported.
381

382
    """
383
    if ((self._ts_last_progress is None or
384
         _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
385
        self._daemon and
386
        self._daemon.progress_mbytes is not None and
387
        self._daemon.progress_throughput is not None):
388
      self._cbs.ReportProgress(self, self._private)
389
      self._ts_last_progress = time.time()
390

    
391
  def CheckFinished(self):
392
    """Checks whether the daemon exited.
393

394
    @rtype: bool
395
    @return: Whether the transfer is finished
396

397
    """
398
    assert self._daemon, "Daemon status missing"
399

    
400
    if self._ts_finished:
401
      return True
402

    
403
    if self._daemon.exit_status is None:
404
      # TODO: Adjust delay for ETA expiring soon
405
      self._CheckProgress()
406
      return False
407

    
408
    self._ts_finished = time.time()
409

    
410
    self._ReportFinished(self._daemon.exit_status == 0,
411
                         self._daemon.error_message)
412

    
413
    return True
414

    
415
  def _ReportFinished(self, success, message):
416
    """Transfer is finished or daemon exited.
417

418
    @type success: bool
419
    @param success: Whether the transfer was successful
420
    @type message: string
421
    @param message: Error message
422

423
    """
424
    assert self.success is None
425

    
426
    self.success = success
427
    self.final_message = message
428

    
429
    if success:
430
      logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
431
                   self.node_name)
432
    elif self._daemon_name:
433
      self._lu.LogWarning("%s %r on %s failed: %s",
434
                          self.MODE_TEXT, self._daemon_name, self.node_name,
435
                          message)
436
    else:
437
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
438
                          self.node_name, message)
439

    
440
    self._cbs.ReportFinished(self, self._private)
441

    
442
  def _Finalize(self):
443
    """Makes the RPC call to finalize this import/export.
444

445
    """
446
    return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
447

    
448
  def Finalize(self, error=None):
449
    """Finalizes this import/export.
450

451
    """
452
    if self._daemon_name:
453
      logging.info("Finalizing %s %r on %s",
454
                   self.MODE_TEXT, self._daemon_name, self.node_name)
455

    
456
      result = self._Finalize()
457
      if result.fail_msg:
458
        self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
459
                            self.MODE_TEXT, self._daemon_name,
460
                            self.node_name, result.fail_msg)
461
        return False
462

    
463
      # Daemon is no longer running
464
      self._daemon_name = None
465
      self._ts_cleanup = time.time()
466

    
467
    if error:
468
      self._ReportFinished(False, error)
469

    
470
    return True
471

    
472

    
473
class DiskImport(_DiskImportExportBase):
474
  MODE_TEXT = "import"
475

    
476
  def __init__(self, lu, node_name, opts, instance,
477
               dest, dest_args, timeouts, cbs, private=None):
478
    """Initializes this class.
479

480
    @param lu: Logical unit instance
481
    @type node_name: string
482
    @param node_name: Node name for import
483
    @type opts: L{objects.ImportExportOptions}
484
    @param opts: Import/export daemon options
485
    @type instance: L{objects.Instance}
486
    @param instance: Instance object
487
    @param dest: I/O destination
488
    @param dest_args: I/O arguments
489
    @type timeouts: L{ImportExportTimeouts}
490
    @param timeouts: Timeouts for this import
491
    @type cbs: L{ImportExportCbBase}
492
    @param cbs: Callbacks
493
    @param private: Private data for callback functions
494

495
    """
496
    _DiskImportExportBase.__init__(self, lu, node_name, opts,
497
                                   instance, timeouts, cbs, private)
498
    self._dest = dest
499
    self._dest_args = dest_args
500

    
501
    # Timestamps
502
    self._ts_listening = None
503

    
504
  @property
505
  def listen_port(self):
506
    """Returns the port the daemon is listening on.
507

508
    """
509
    if self._daemon:
510
      return self._daemon.listen_port
511

    
512
    return None
513

    
514
  def _StartDaemon(self):
515
    """Starts the import daemon.
516

517
    """
518
    return self._lu.rpc.call_import_start(self.node_name, self._opts,
519
                                          self._instance,
520
                                          self._dest, self._dest_args)
521

    
522
  def CheckListening(self):
523
    """Checks whether the daemon is listening.
524

525
    @rtype: bool
526
    @return: Whether the daemon is listening
527

528
    """
529
    assert self._daemon, "Daemon status missing"
530

    
531
    if self._ts_listening is not None:
532
      return True
533

    
534
    port = self._daemon.listen_port
535
    if port is not None:
536
      self._ts_listening = time.time()
537

    
538
      logging.debug("Import %r on %s is now listening on port %s",
539
                    self._daemon_name, self.node_name, port)
540

    
541
      self._cbs.ReportListening(self, self._private)
542

    
543
      return True
544

    
545
    if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
546
      raise _ImportExportError("Not listening after %s seconds" %
547
                               self._timeouts.listen)
548

    
549
    return False
550

    
551
  def _GetConnectedCheckEpoch(self):
552
    """Returns the time since we started listening.
553

554
    """
555
    assert self._ts_listening is not None, \
556
           ("Checking whether an import is connected is only useful"
557
            " once it's been listening")
558

    
559
    return self._ts_listening
560

    
561

    
562
class DiskExport(_DiskImportExportBase):
563
  MODE_TEXT = "export"
564

    
565
  def __init__(self, lu, node_name, opts,
566
               dest_host, dest_port, instance, source, source_args,
567
               timeouts, cbs, private=None):
568
    """Initializes this class.
569

570
    @param lu: Logical unit instance
571
    @type node_name: string
572
    @param node_name: Node name for import
573
    @type opts: L{objects.ImportExportOptions}
574
    @param opts: Import/export daemon options
575
    @type dest_host: string
576
    @param dest_host: Destination host name or IP address
577
    @type dest_port: number
578
    @param dest_port: Destination port number
579
    @type instance: L{objects.Instance}
580
    @param instance: Instance object
581
    @param source: I/O source
582
    @param source_args: I/O source
583
    @type timeouts: L{ImportExportTimeouts}
584
    @param timeouts: Timeouts for this import
585
    @type cbs: L{ImportExportCbBase}
586
    @param cbs: Callbacks
587
    @param private: Private data for callback functions
588

589
    """
590
    _DiskImportExportBase.__init__(self, lu, node_name, opts,
591
                                   instance, timeouts, cbs, private)
592
    self._dest_host = dest_host
593
    self._dest_port = dest_port
594
    self._source = source
595
    self._source_args = source_args
596

    
597
  def _StartDaemon(self):
598
    """Starts the export daemon.
599

600
    """
601
    return self._lu.rpc.call_export_start(self.node_name, self._opts,
602
                                          self._dest_host, self._dest_port,
603
                                          self._instance, self._source,
604
                                          self._source_args)
605

    
606
  def CheckListening(self):
607
    """Checks whether the daemon is listening.
608

609
    """
610
    # Only an import can be listening
611
    return True
612

    
613
  def _GetConnectedCheckEpoch(self):
614
    """Returns the time since the daemon started.
615

616
    """
617
    assert self._ts_begin is not None
618

    
619
    return self._ts_begin
620

    
621

    
622
def FormatProgress(progress):
623
  """Formats progress information for user consumption
624

625
  """
626
  (mbytes, throughput, percent, eta) = progress
627

    
628
  parts = [
629
    utils.FormatUnit(mbytes, "h"),
630

    
631
    # Not using FormatUnit as it doesn't support kilobytes
632
    "%0.1f MiB/s" % throughput,
633
    ]
634

    
635
  if percent is not None:
636
    parts.append("%d%%" % percent)
637

    
638
  if eta is not None:
639
    parts.append("ETA %s" % utils.FormatSeconds(eta))
640

    
641
  return utils.CommaJoin(parts)
642

    
643

    
644
class ImportExportLoop:
645
  MIN_DELAY = 1.0
646
  MAX_DELAY = 20.0
647

    
648
  def __init__(self, lu):
649
    """Initializes this class.
650

651
    """
652
    self._lu = lu
653
    self._queue = []
654
    self._pending_add = []
655

    
656
  def Add(self, diskie):
657
    """Adds an import/export object to the loop.
658

659
    @type diskie: Subclass of L{_DiskImportExportBase}
660
    @param diskie: Import/export object
661

662
    """
663
    assert diskie not in self._pending_add
664
    assert diskie.loop is None
665

    
666
    diskie.SetLoop(self)
667

    
668
    # Adding new objects to a staging list is necessary, otherwise the main
669
    # loop gets confused if callbacks modify the queue while the main loop is
670
    # iterating over it.
671
    self._pending_add.append(diskie)
672

    
673
  @staticmethod
674
  def _CollectDaemonStatus(lu, daemons):
675
    """Collects the status for all import/export daemons.
676

677
    """
678
    daemon_status = {}
679

    
680
    for node_name, names in daemons.iteritems():
681
      result = lu.rpc.call_impexp_status(node_name, names)
682
      if result.fail_msg:
683
        lu.LogWarning("Failed to get daemon status on %s: %s",
684
                      node_name, result.fail_msg)
685
        continue
686

    
687
      assert len(names) == len(result.payload)
688

    
689
      daemon_status[node_name] = dict(zip(names, result.payload))
690

    
691
    return daemon_status
692

    
693
  @staticmethod
694
  def _GetActiveDaemonNames(queue):
695
    """Gets the names of all active daemons.
696

697
    """
698
    result = {}
699
    for diskie in queue:
700
      if not diskie.active:
701
        continue
702

    
703
      try:
704
        # Start daemon if necessary
705
        daemon_name = diskie.CheckDaemon()
706
      except _ImportExportError, err:
707
        logging.exception("%s failed", diskie.MODE_TEXT)
708
        diskie.Finalize(error=str(err))
709
        continue
710

    
711
      result.setdefault(diskie.node_name, []).append(daemon_name)
712

    
713
    assert len(queue) >= len(result)
714
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
715

    
716
    logging.debug("daemons=%r", result)
717

    
718
    return result
719

    
720
  def _AddPendingToQueue(self):
721
    """Adds all pending import/export objects to the internal queue.
722

723
    """
724
    assert compat.all(diskie not in self._queue and diskie.loop == self
725
                      for diskie in self._pending_add)
726

    
727
    self._queue.extend(self._pending_add)
728

    
729
    del self._pending_add[:]
730

    
731
  def Run(self):
732
    """Utility main loop.
733

734
    """
735
    while True:
736
      self._AddPendingToQueue()
737

    
738
      # Collect all active daemon names
739
      daemons = self._GetActiveDaemonNames(self._queue)
740
      if not daemons:
741
        break
742

    
743
      # Collection daemon status data
744
      data = self._CollectDaemonStatus(self._lu, daemons)
745

    
746
      # Use data
747
      delay = self.MAX_DELAY
748
      for diskie in self._queue:
749
        if not diskie.active:
750
          continue
751

    
752
        try:
753
          try:
754
            all_daemon_data = data[diskie.node_name]
755
          except KeyError:
756
            result = diskie.SetDaemonData(False, None)
757
          else:
758
            result = \
759
              diskie.SetDaemonData(True,
760
                                   all_daemon_data[diskie.GetDaemonName()])
761

    
762
          if not result:
763
            # Daemon not yet ready, retry soon
764
            delay = min(3.0, delay)
765
            continue
766

    
767
          if diskie.CheckFinished():
768
            # Transfer finished
769
            diskie.Finalize()
770
            continue
771

    
772
          # Normal case: check again in 5 seconds
773
          delay = min(5.0, delay)
774

    
775
          if not diskie.CheckListening():
776
            # Not yet listening, retry soon
777
            delay = min(1.0, delay)
778
            continue
779

    
780
          if not diskie.CheckConnected():
781
            # Not yet connected, retry soon
782
            delay = min(1.0, delay)
783
            continue
784

    
785
        except _ImportExportError, err:
786
          logging.exception("%s failed", diskie.MODE_TEXT)
787
          diskie.Finalize(error=str(err))
788

    
789
      if not compat.any([diskie.active for diskie in self._queue]):
790
        break
791

    
792
      # Wait a bit
793
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
794
      logging.debug("Waiting for %ss", delay)
795
      time.sleep(delay)
796

    
797
  def FinalizeAll(self):
798
    """Finalizes all pending transfers.
799

800
    """
801
    success = True
802

    
803
    for diskie in self._queue:
804
      success = diskie.Finalize() and success
805

    
806
    return success
807

    
808

    
809
class _TransferInstCbBase(ImportExportCbBase):
810
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
811
               dest_node, dest_ip, export_opts):
812
    """Initializes this class.
813

814
    """
815
    ImportExportCbBase.__init__(self)
816

    
817
    self.lu = lu
818
    self.feedback_fn = feedback_fn
819
    self.instance = instance
820
    self.timeouts = timeouts
821
    self.src_node = src_node
822
    self.src_cbs = src_cbs
823
    self.dest_node = dest_node
824
    self.dest_ip = dest_ip
825
    self.export_opts = export_opts
826

    
827

    
828
class _TransferInstSourceCb(_TransferInstCbBase):
829
  def ReportConnected(self, ie, dtp):
830
    """Called when a connection has been established.
831

832
    """
833
    assert self.src_cbs is None
834
    assert dtp.src_export == ie
835
    assert dtp.dest_import
836

    
837
    self.feedback_fn("%s is sending data on %s" %
838
                     (dtp.data.name, ie.node_name))
839

    
840
  def ReportProgress(self, ie, dtp):
841
    """Called when new progress information should be reported.
842

843
    """
844
    progress = ie.progress
845
    if not progress:
846
      return
847

    
848
    self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
849

    
850
  def ReportFinished(self, ie, dtp):
851
    """Called when a transfer has finished.
852

853
    """
854
    assert self.src_cbs is None
855
    assert dtp.src_export == ie
856
    assert dtp.dest_import
857

    
858
    if ie.success:
859
      self.feedback_fn("%s finished sending data" % dtp.data.name)
860
    else:
861
      self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
862
                       (dtp.data.name, ie.final_message, ie.recent_output))
863

    
864
    dtp.RecordResult(ie.success)
865

    
866
    cb = dtp.data.finished_fn
867
    if cb:
868
      cb()
869

    
870
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
871
    # give the daemon a moment to sort things out
872
    if dtp.dest_import and not ie.success:
873
      dtp.dest_import.Abort()
874

    
875

    
876
class _TransferInstDestCb(_TransferInstCbBase):
877
  def ReportListening(self, ie, dtp):
878
    """Called when daemon started listening.
879

880
    """
881
    assert self.src_cbs
882
    assert dtp.src_export is None
883
    assert dtp.dest_import
884

    
885
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
886

    
887
    # Start export on source node
888
    de = DiskExport(self.lu, self.src_node, self.export_opts,
889
                    self.dest_ip, ie.listen_port,
890
                    self.instance, dtp.data.src_io, dtp.data.src_ioargs,
891
                    self.timeouts, self.src_cbs, private=dtp)
892
    ie.loop.Add(de)
893

    
894
    dtp.src_export = de
895

    
896
  def ReportConnected(self, ie, dtp):
897
    """Called when a connection has been established.
898

899
    """
900
    self.feedback_fn("%s is receiving data on %s" %
901
                     (dtp.data.name, self.dest_node))
902

    
903
  def ReportFinished(self, ie, dtp):
904
    """Called when a transfer has finished.
905

906
    """
907
    if ie.success:
908
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
909
    else:
910
      self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
911
                       (dtp.data.name, ie.final_message, ie.recent_output))
912

    
913
    dtp.RecordResult(ie.success)
914

    
915
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
916
    # give the daemon a moment to sort things out
917
    if dtp.src_export and not ie.success:
918
      dtp.src_export.Abort()
919

    
920

    
921
class DiskTransfer(object):
922
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
923
               finished_fn):
924
    """Initializes this class.
925

926
    @type name: string
927
    @param name: User-visible name for this transfer (e.g. "disk/0")
928
    @param src_io: Source I/O type
929
    @param src_ioargs: Source I/O arguments
930
    @param dest_io: Destination I/O type
931
    @param dest_ioargs: Destination I/O arguments
932
    @type finished_fn: callable
933
    @param finished_fn: Function called once transfer has finished
934

935
    """
936
    self.name = name
937

    
938
    self.src_io = src_io
939
    self.src_ioargs = src_ioargs
940

    
941
    self.dest_io = dest_io
942
    self.dest_ioargs = dest_ioargs
943

    
944
    self.finished_fn = finished_fn
945

    
946

    
947
class _DiskTransferPrivate(object):
948
  def __init__(self, data, success):
949
    """Initializes this class.
950

951
    @type data: L{DiskTransfer}
952
    @type success: bool
953

954
    """
955
    self.data = data
956

    
957
    self.src_export = None
958
    self.dest_import = None
959

    
960
    self.success = success
961

    
962
  def RecordResult(self, success):
963
    """Updates the status.
964

965
    One failed part will cause the whole transfer to fail.
966

967
    """
968
    self.success = self.success and success
969

    
970

    
971
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
972
                         instance, all_transfers):
973
  """Transfers an instance's data from one node to another.
974

975
  @param lu: Logical unit instance
976
  @param feedback_fn: Feedback function
977
  @type src_node: string
978
  @param src_node: Source node name
979
  @type dest_node: string
980
  @param dest_node: Destination node name
981
  @type dest_ip: string
982
  @param dest_ip: IP address of destination node
983
  @type instance: L{objects.Instance}
984
  @param instance: Instance object
985
  @type all_transfers: list of L{DiskTransfer} instances
986
  @param all_transfers: List of all disk transfers to be made
987
  @rtype: list
988
  @return: List with a boolean (True=successful, False=failed) for success for
989
           each transfer
990

991
  """
992
  # Compress only if transfer is to another node
993
  if src_node == dest_node:
994
    compress = constants.IEC_NONE
995
  else:
996
    compress = constants.IEC_GZIP
997

    
998
  logging.debug("Source node %s, destination node %s, compression '%s'",
999
                src_node, dest_node, compress)
1000

    
1001
  opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1002
                                     compress=compress)
1003

    
1004
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1005
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1006
                                  src_node, None, dest_node, dest_ip, opts)
1007
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1008
                                 src_node, src_cbs, dest_node, dest_ip, opts)
1009

    
1010
  all_dtp = []
1011

    
1012
  ieloop = ImportExportLoop(lu)
1013
  try:
1014
    for transfer in all_transfers:
1015
      if transfer:
1016
        feedback_fn("Exporting %s from %s to %s" %
1017
                    (transfer.name, src_node, dest_node))
1018

    
1019
        dtp = _DiskTransferPrivate(transfer, True)
1020

    
1021
        di = DiskImport(lu, dest_node, opts, instance,
1022
                        transfer.dest_io, transfer.dest_ioargs,
1023
                        timeouts, dest_cbs, private=dtp)
1024
        ieloop.Add(di)
1025

    
1026
        dtp.dest_import = di
1027
      else:
1028
        dtp = _DiskTransferPrivate(None, False)
1029

    
1030
      all_dtp.append(dtp)
1031

    
1032
    ieloop.Run()
1033
  finally:
1034
    ieloop.FinalizeAll()
1035

    
1036
  assert len(all_dtp) == len(all_transfers)
1037
  assert compat.all([(dtp.src_export is None or
1038
                      dtp.src_export.success is not None) and
1039
                     (dtp.dest_import is None or
1040
                      dtp.dest_import.success is not None)
1041
                     for dtp in all_dtp]), \
1042
         "Not all imports/exports are finalized"
1043

    
1044
  return [bool(dtp.success) for dtp in all_dtp]
1045

    
1046

    
1047
class _RemoteExportCb(ImportExportCbBase):
1048
  def __init__(self, feedback_fn, disk_count):
1049
    """Initializes this class.
1050

1051
    """
1052
    ImportExportCbBase.__init__(self)
1053
    self._feedback_fn = feedback_fn
1054
    self._dresults = [None] * disk_count
1055

    
1056
  @property
1057
  def disk_results(self):
1058
    """Returns per-disk results.
1059

1060
    """
1061
    return self._dresults
1062

    
1063
  def ReportConnected(self, ie, private):
1064
    """Called when a connection has been established.
1065

1066
    """
1067
    (idx, _) = private
1068

    
1069
    self._feedback_fn("Disk %s is now sending data" % idx)
1070

    
1071
  def ReportProgress(self, ie, private):
1072
    """Called when new progress information should be reported.
1073

1074
    """
1075
    (idx, _) = private
1076

    
1077
    progress = ie.progress
1078
    if not progress:
1079
      return
1080

    
1081
    self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1082

    
1083
  def ReportFinished(self, ie, private):
1084
    """Called when a transfer has finished.
1085

1086
    """
1087
    (idx, finished_fn) = private
1088

    
1089
    if ie.success:
1090
      self._feedback_fn("Disk %s finished sending data" % idx)
1091
    else:
1092
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" %
1093
                        (idx, ie.final_message, ie.recent_output))
1094

    
1095
    self._dresults[idx] = bool(ie.success)
1096

    
1097
    if finished_fn:
1098
      finished_fn()
1099

    
1100

    
1101
class ExportInstanceHelper:
1102
  def __init__(self, lu, feedback_fn, instance):
1103
    """Initializes this class.
1104

1105
    @param lu: Logical unit instance
1106
    @param feedback_fn: Feedback function
1107
    @type instance: L{objects.Instance}
1108
    @param instance: Instance object
1109

1110
    """
1111
    self._lu = lu
1112
    self._feedback_fn = feedback_fn
1113
    self._instance = instance
1114

    
1115
    self._snap_disks = []
1116
    self._removed_snaps = [False] * len(instance.disks)
1117

    
1118
  def CreateSnapshots(self):
1119
    """Creates an LVM snapshot for every disk of the instance.
1120

1121
    """
1122
    assert not self._snap_disks
1123

    
1124
    instance = self._instance
1125
    src_node = instance.primary_node
1126

    
1127
    vgname = self._lu.cfg.GetVGName()
1128

    
1129
    for idx, disk in enumerate(instance.disks):
1130
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1131
                        (idx, src_node))
1132

    
1133
      # result.payload will be a snapshot of an lvm leaf of the one we
1134
      # passed
1135
      result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1136
      msg = result.fail_msg
1137
      if msg:
1138
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1139
                            idx, src_node, msg)
1140
        new_dev = False
1141
      else:
1142
        disk_id = (vgname, result.payload)
1143
        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1144
                               logical_id=disk_id, physical_id=disk_id,
1145
                               iv_name=disk.iv_name)
1146

    
1147
      self._snap_disks.append(new_dev)
1148

    
1149
    assert len(self._snap_disks) == len(instance.disks)
1150
    assert len(self._removed_snaps) == len(instance.disks)
1151

    
1152
  def _RemoveSnapshot(self, disk_index):
1153
    """Removes an LVM snapshot.
1154

1155
    @type disk_index: number
1156
    @param disk_index: Index of the snapshot to be removed
1157

1158
    """
1159
    disk = self._snap_disks[disk_index]
1160
    if disk and not self._removed_snaps[disk_index]:
1161
      src_node = self._instance.primary_node
1162

    
1163
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1164
                        (disk_index, src_node))
1165

    
1166
      result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1167
      if result.fail_msg:
1168
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1169
                            " %s: %s", disk_index, src_node, result.fail_msg)
1170
      else:
1171
        self._removed_snaps[disk_index] = True
1172

    
1173
  def LocalExport(self, dest_node):
1174
    """Intra-cluster instance export.
1175

1176
    @type dest_node: L{objects.Node}
1177
    @param dest_node: Destination node
1178

1179
    """
1180
    instance = self._instance
1181
    src_node = instance.primary_node
1182

    
1183
    assert len(self._snap_disks) == len(instance.disks)
1184

    
1185
    transfers = []
1186

    
1187
    for idx, dev in enumerate(self._snap_disks):
1188
      if not dev:
1189
        transfers.append(None)
1190
        continue
1191

    
1192
      path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1193
                            dev.physical_id[1])
1194

    
1195
      finished_fn = compat.partial(self._TransferFinished, idx)
1196

    
1197
      # FIXME: pass debug option from opcode to backend
1198
      dt = DiskTransfer("snapshot/%s" % idx,
1199
                        constants.IEIO_SCRIPT, (dev, idx),
1200
                        constants.IEIO_FILE, (path, ),
1201
                        finished_fn)
1202
      transfers.append(dt)
1203

    
1204
    # Actually export data
1205
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1206
                                    src_node, dest_node.name,
1207
                                    dest_node.secondary_ip,
1208
                                    instance, transfers)
1209

    
1210
    assert len(dresults) == len(instance.disks)
1211

    
1212
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1213
    result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1214
                                               self._snap_disks)
1215
    msg = result.fail_msg
1216
    fin_resu = not msg
1217
    if msg:
1218
      self._lu.LogWarning("Could not finalize export for instance %s"
1219
                          " on node %s: %s", instance.name, dest_node.name, msg)
1220

    
1221
    return (fin_resu, dresults)
1222

    
1223
  def RemoteExport(self, opts, disk_info, timeouts):
1224
    """Inter-cluster instance export.
1225

1226
    @type opts: L{objects.ImportExportOptions}
1227
    @param opts: Import/export daemon options
1228
    @type disk_info: list
1229
    @param disk_info: Per-disk destination information
1230
    @type timeouts: L{ImportExportTimeouts}
1231
    @param timeouts: Timeouts for this import
1232

1233
    """
1234
    instance = self._instance
1235

    
1236
    assert len(disk_info) == len(instance.disks)
1237

    
1238
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1239

    
1240
    ieloop = ImportExportLoop(self._lu)
1241
    try:
1242
      for idx, (dev, (host, port)) in enumerate(zip(instance.disks,
1243
                                                    disk_info)):
1244
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1245
        finished_fn = compat.partial(self._TransferFinished, idx)
1246
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1247
                              opts, host, port, instance,
1248
                              constants.IEIO_SCRIPT, (dev, idx),
1249
                              timeouts, cbs, private=(idx, finished_fn)))
1250

    
1251
      ieloop.Run()
1252
    finally:
1253
      ieloop.FinalizeAll()
1254

    
1255
    return (True, cbs.disk_results)
1256

    
1257
  def _TransferFinished(self, idx):
1258
    """Called once a transfer has finished.
1259

1260
    @type idx: number
1261
    @param idx: Disk index
1262

1263
    """
1264
    logging.debug("Transfer %s finished", idx)
1265
    self._RemoveSnapshot(idx)
1266

    
1267
  def Cleanup(self):
1268
    """Remove all snapshots.
1269

1270
    """
1271
    assert len(self._removed_snaps) == len(self._instance.disks)
1272
    for idx in range(len(self._instance.disks)):
1273
      self._RemoveSnapshot(idx)
1274

    
1275

    
1276
class _RemoteImportCb(ImportExportCbBase):
1277
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1278
               external_address):
1279
    """Initializes this class.
1280

1281
    @type cds: string
1282
    @param cds: Cluster domain secret
1283
    @type x509_cert_pem: string
1284
    @param x509_cert_pem: CA used for signing import key
1285
    @type disk_count: number
1286
    @param disk_count: Number of disks
1287
    @type external_address: string
1288
    @param external_address: External address of destination node
1289

1290
    """
1291
    ImportExportCbBase.__init__(self)
1292
    self._feedback_fn = feedback_fn
1293
    self._cds = cds
1294
    self._x509_cert_pem = x509_cert_pem
1295
    self._disk_count = disk_count
1296
    self._external_address = external_address
1297

    
1298
    self._dresults = [None] * disk_count
1299
    self._daemon_port = [None] * disk_count
1300

    
1301
    self._salt = utils.GenerateSecret(8)
1302

    
1303
  @property
1304
  def disk_results(self):
1305
    """Returns per-disk results.
1306

1307
    """
1308
    return self._dresults
1309

    
1310
  def _CheckAllListening(self):
1311
    """Checks whether all daemons are listening.
1312

1313
    If all daemons are listening, the information is sent to the client.
1314

1315
    """
1316
    if not compat.all(dp is not None for dp in self._daemon_port):
1317
      return
1318

    
1319
    host = self._external_address
1320

    
1321
    disks = []
1322
    for idx, port in enumerate(self._daemon_port):
1323
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1324
                                               idx, host, port))
1325

    
1326
    assert len(disks) == self._disk_count
1327

    
1328
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1329
      "disks": disks,
1330
      "x509_ca": self._x509_cert_pem,
1331
      })
1332

    
1333
  def ReportListening(self, ie, private):
1334
    """Called when daemon started listening.
1335

1336
    """
1337
    (idx, ) = private
1338

    
1339
    self._feedback_fn("Disk %s is now listening" % idx)
1340

    
1341
    assert self._daemon_port[idx] is None
1342

    
1343
    self._daemon_port[idx] = ie.listen_port
1344

    
1345
    self._CheckAllListening()
1346

    
1347
  def ReportConnected(self, ie, private):
1348
    """Called when a connection has been established.
1349

1350
    """
1351
    (idx, ) = private
1352

    
1353
    self._feedback_fn("Disk %s is now receiving data" % idx)
1354

    
1355
  def ReportFinished(self, ie, private):
1356
    """Called when a transfer has finished.
1357

1358
    """
1359
    (idx, ) = private
1360

    
1361
    # Daemon is certainly no longer listening
1362
    self._daemon_port[idx] = None
1363

    
1364
    if ie.success:
1365
      self._feedback_fn("Disk %s finished receiving data" % idx)
1366
    else:
1367
      self._feedback_fn(("Disk %s failed to receive data: %s"
1368
                         " (recent output: %r)") %
1369
                        (idx, ie.final_message, ie.recent_output))
1370

    
1371
    self._dresults[idx] = bool(ie.success)
1372

    
1373

    
1374
def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts):
1375
  """Imports an instance from another cluster.
1376

1377
  @param lu: Logical unit instance
1378
  @param feedback_fn: Feedback function
1379
  @type instance: L{objects.Instance}
1380
  @param instance: Instance object
1381
  @type source_x509_ca: OpenSSL.crypto.X509
1382
  @param source_x509_ca: Import source's X509 CA
1383
  @type cds: string
1384
  @param cds: Cluster domain secret
1385
  @type timeouts: L{ImportExportTimeouts}
1386
  @param timeouts: Timeouts for this import
1387

1388
  """
1389
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1390
                                                  source_x509_ca)
1391

    
1392
  # Create crypto key
1393
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1394
                                        constants.RIE_CERT_VALIDITY)
1395
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1396

    
1397
  (x509_key_name, x509_cert_pem) = result.payload
1398
  try:
1399
    # Load certificate
1400
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1401
                                                x509_cert_pem)
1402

    
1403
    # Import daemon options
1404
    opts = objects.ImportExportOptions(key_name=x509_key_name,
1405
                                       ca_pem=source_ca_pem)
1406

    
1407
    # Sign certificate
1408
    signed_x509_cert_pem = \
1409
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1410

    
1411
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1412
                          len(instance.disks), instance.primary_node)
1413

    
1414
    ieloop = ImportExportLoop(lu)
1415
    try:
1416
      for idx, dev in enumerate(instance.disks):
1417
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1418
                              constants.IEIO_SCRIPT, (dev, idx),
1419
                              timeouts, cbs, private=(idx, )))
1420

    
1421
      ieloop.Run()
1422
    finally:
1423
      ieloop.FinalizeAll()
1424
  finally:
1425
    # Remove crypto key and certificate
1426
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1427
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1428

    
1429
  return cbs.disk_results
1430

    
1431

    
1432
def _GetImportExportHandshakeMessage(version):
1433
  """Returns the handshake message for a RIE protocol version.
1434

1435
  @type version: number
1436

1437
  """
1438
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1439

    
1440

    
1441
def ComputeRemoteExportHandshake(cds):
1442
  """Computes the remote import/export handshake.
1443

1444
  @type cds: string
1445
  @param cds: Cluster domain secret
1446

1447
  """
1448
  salt = utils.GenerateSecret(8)
1449
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1450
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1451

    
1452

    
1453
def CheckRemoteExportHandshake(cds, handshake):
1454
  """Checks the handshake of a remote import/export.
1455

1456
  @type cds: string
1457
  @param cds: Cluster domain secret
1458
  @type handshake: sequence
1459
  @param handshake: Handshake sent by remote peer
1460

1461
  """
1462
  try:
1463
    (version, hmac_digest, hmac_salt) = handshake
1464
  except (TypeError, ValueError), err:
1465
    return "Invalid data: %s" % err
1466

    
1467
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1468
                              hmac_digest, salt=hmac_salt):
1469
    return "Hash didn't match, clusters don't share the same domain secret"
1470

    
1471
  if version != constants.RIE_VERSION:
1472
    return ("Clusters don't have the same remote import/export protocol"
1473
            " (local=%s, remote=%s)" %
1474
            (constants.RIE_VERSION, version))
1475

    
1476
  return None
1477

    
1478

    
1479
def _GetRieDiskInfoMessage(disk_index, host, port):
1480
  """Returns the hashed text for import/export disk information.
1481

1482
  @type disk_index: number
1483
  @param disk_index: Index of disk (included in hash)
1484
  @type host: string
1485
  @param host: Hostname
1486
  @type port: number
1487
  @param port: Daemon port
1488

1489
  """
1490
  return "%s:%s:%s" % (disk_index, host, port)
1491

    
1492

    
1493
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1494
  """Verifies received disk information for an export.
1495

1496
  @type cds: string
1497
  @param cds: Cluster domain secret
1498
  @type disk_index: number
1499
  @param disk_index: Index of disk (included in hash)
1500
  @type disk_info: sequence
1501
  @param disk_info: Disk information sent by remote peer
1502

1503
  """
1504
  try:
1505
    (host, port, hmac_digest, hmac_salt) = disk_info
1506
  except (TypeError, ValueError), err:
1507
    raise errors.GenericError("Invalid data: %s" % err)
1508

    
1509
  if not (host and port):
1510
    raise errors.GenericError("Missing destination host or port")
1511

    
1512
  msg = _GetRieDiskInfoMessage(disk_index, host, port)
1513

    
1514
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1515
    raise errors.GenericError("HMAC is wrong")
1516

    
1517
  return (utils.HostInfo.NormalizeName(host),
1518
          utils.ValidateServiceName(port))
1519

    
1520

    
1521
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port):
1522
  """Computes the signed disk information for a remote import.
1523

1524
  @type cds: string
1525
  @param cds: Cluster domain secret
1526
  @type salt: string
1527
  @param salt: HMAC salt
1528
  @type disk_index: number
1529
  @param disk_index: Index of disk (included in hash)
1530
  @type host: string
1531
  @param host: Hostname
1532
  @type port: number
1533
  @param port: Daemon port
1534

1535
  """
1536
  msg = _GetRieDiskInfoMessage(disk_index, host, port)
1537
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1538
  return (host, port, hmac_digest, salt)