Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ 1a2e7fe9

History | View | Annotate | Download (41.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Instance-related functions and classes for masterd.
23

24
"""
25

    
26
import logging
27
import time
28
import OpenSSL
29

    
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import compat
33
from ganeti import utils
34
from ganeti import objects
35

    
36

    
37
class _ImportExportError(Exception):
38
  """Local exception to report import/export errors.
39

40
  """
41

    
42

    
43
class ImportExportTimeouts(object):
44
  #: Time until daemon starts writing status file
45
  DEFAULT_READY_TIMEOUT = 10
46

    
47
  #: Length of time until errors cause hard failure
48
  DEFAULT_ERROR_TIMEOUT = 10
49

    
50
  #: Time after which daemon must be listening
51
  DEFAULT_LISTEN_TIMEOUT = 10
52

    
53
  #: Progress update interval
54
  DEFAULT_PROGRESS_INTERVAL = 60
55

    
56
  __slots__ = [
57
    "error",
58
    "ready",
59
    "listen",
60
    "connect",
61
    "progress",
62
    ]
63

    
64
  def __init__(self, connect,
65
               listen=DEFAULT_LISTEN_TIMEOUT,
66
               error=DEFAULT_ERROR_TIMEOUT,
67
               ready=DEFAULT_READY_TIMEOUT,
68
               progress=DEFAULT_PROGRESS_INTERVAL):
69
    """Initializes this class.
70

71
    @type connect: number
72
    @param connect: Timeout for establishing connection
73
    @type listen: number
74
    @param listen: Timeout for starting to listen for connections
75
    @type error: number
76
    @param error: Length of time until errors cause hard failure
77
    @type ready: number
78
    @param ready: Timeout for daemon to become ready
79
    @type progress: number
80
    @param progress: Progress update interval
81

82
    """
83
    self.error = error
84
    self.ready = ready
85
    self.listen = listen
86
    self.connect = connect
87
    self.progress = progress
88

    
89

    
90
class ImportExportCbBase(object):
91
  """Callbacks for disk import/export.
92

93
  """
94
  def ReportListening(self, ie, private):
95
    """Called when daemon started listening.
96

97
    @type ie: Subclass of L{_DiskImportExportBase}
98
    @param ie: Import/export object
99
    @param private: Private data passed to import/export object
100

101
    """
102

    
103
  def ReportConnected(self, ie, private):
104
    """Called when a connection has been established.
105

106
    @type ie: Subclass of L{_DiskImportExportBase}
107
    @param ie: Import/export object
108
    @param private: Private data passed to import/export object
109

110
    """
111

    
112
  def ReportProgress(self, ie, private):
113
    """Called when new progress information should be reported.
114

115
    @type ie: Subclass of L{_DiskImportExportBase}
116
    @param ie: Import/export object
117
    @param private: Private data passed to import/export object
118

119
    """
120

    
121
  def ReportFinished(self, ie, private):
122
    """Called when a transfer has finished.
123

124
    @type ie: Subclass of L{_DiskImportExportBase}
125
    @param ie: Import/export object
126
    @param private: Private data passed to import/export object
127

128
    """
129

    
130

    
131
def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
132
  """Checks whether a timeout has expired.
133

134
  """
135
  return _time_fn() > (epoch + timeout)
136

    
137

    
138
class _DiskImportExportBase(object):
139
  MODE_TEXT = None
140

    
141
  def __init__(self, lu, node_name, opts,
142
               instance, timeouts, cbs, private=None):
143
    """Initializes this class.
144

145
    @param lu: Logical unit instance
146
    @type node_name: string
147
    @param node_name: Node name for import
148
    @type opts: L{objects.ImportExportOptions}
149
    @param opts: Import/export daemon options
150
    @type instance: L{objects.Instance}
151
    @param instance: Instance object
152
    @type timeouts: L{ImportExportTimeouts}
153
    @param timeouts: Timeouts for this import
154
    @type cbs: L{ImportExportCbBase}
155
    @param cbs: Callbacks
156
    @param private: Private data for callback functions
157

158
    """
159
    assert self.MODE_TEXT
160

    
161
    self._lu = lu
162
    self.node_name = node_name
163
    self._opts = opts
164
    self._instance = instance
165
    self._timeouts = timeouts
166
    self._cbs = cbs
167
    self._private = private
168

    
169
    # Parent loop
170
    self._loop = None
171

    
172
    # Timestamps
173
    self._ts_begin = None
174
    self._ts_connected = None
175
    self._ts_finished = None
176
    self._ts_cleanup = None
177
    self._ts_last_progress = None
178
    self._ts_last_error = None
179

    
180
    # Transfer status
181
    self.success = None
182
    self.final_message = None
183

    
184
    # Daemon status
185
    self._daemon_name = None
186
    self._daemon = None
187

    
188
  @property
189
  def recent_output(self):
190
    """Returns the most recent output from the daemon.
191

192
    """
193
    if self._daemon:
194
      return self._daemon.recent_output
195

    
196
    return None
197

    
198
  @property
199
  def progress(self):
200
    """Returns transfer progress information.
201

202
    """
203
    if not self._daemon:
204
      return None
205

    
206
    return (self._daemon.progress_mbytes,
207
            self._daemon.progress_throughput,
208
            self._daemon.progress_percent,
209
            self._daemon.progress_eta)
210

    
211
  @property
212
  def active(self):
213
    """Determines whether this transport is still active.
214

215
    """
216
    return self.success is None
217

    
218
  @property
219
  def loop(self):
220
    """Returns parent loop.
221

222
    @rtype: L{ImportExportLoop}
223

224
    """
225
    return self._loop
226

    
227
  def SetLoop(self, loop):
228
    """Sets the parent loop.
229

230
    @type loop: L{ImportExportLoop}
231

232
    """
233
    if self._loop:
234
      raise errors.ProgrammerError("Loop can only be set once")
235

    
236
    self._loop = loop
237

    
238
  def _StartDaemon(self):
239
    """Starts the import/export daemon.
240

241
    """
242
    raise NotImplementedError()
243

    
244
  def CheckDaemon(self):
245
    """Checks whether daemon has been started and if not, starts it.
246

247
    @rtype: string
248
    @return: Daemon name
249

250
    """
251
    assert self._ts_cleanup is None
252

    
253
    if self._daemon_name is None:
254
      assert self._ts_begin is None
255

    
256
      result = self._StartDaemon()
257
      if result.fail_msg:
258
        raise _ImportExportError("Failed to start %s on %s: %s" %
259
                                 (self.MODE_TEXT, self.node_name,
260
                                  result.fail_msg))
261

    
262
      daemon_name = result.payload
263

    
264
      logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
265
                   self.node_name)
266

    
267
      self._ts_begin = time.time()
268
      self._daemon_name = daemon_name
269

    
270
    return self._daemon_name
271

    
272
  def GetDaemonName(self):
273
    """Returns the daemon name.
274

275
    """
276
    assert self._daemon_name, "Daemon has not been started"
277
    assert self._ts_cleanup is None
278
    return self._daemon_name
279

    
280
  def Abort(self):
281
    """Sends SIGTERM to import/export daemon (if still active).
282

283
    """
284
    if self._daemon_name:
285
      self._lu.LogWarning("Aborting %s %r on %s",
286
                          self.MODE_TEXT, self._daemon_name, self.node_name)
287
      result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
288
      if result.fail_msg:
289
        self._lu.LogWarning("Failed to abort %s %r on %s: %s",
290
                            self.MODE_TEXT, self._daemon_name,
291
                            self.node_name, result.fail_msg)
292
        return False
293

    
294
    return True
295

    
296
  def _SetDaemonData(self, data):
297
    """Internal function for updating status daemon data.
298

299
    @type data: L{objects.ImportExportStatus}
300
    @param data: Daemon status data
301

302
    """
303
    assert self._ts_begin is not None
304

    
305
    if not data:
306
      if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
307
        raise _ImportExportError("Didn't become ready after %s seconds" %
308
                                 self._timeouts.ready)
309

    
310
      return False
311

    
312
    self._daemon = data
313

    
314
    return True
315

    
316
  def SetDaemonData(self, success, data):
317
    """Updates daemon status data.
318

319
    @type success: bool
320
    @param success: Whether fetching data was successful or not
321
    @type data: L{objects.ImportExportStatus}
322
    @param data: Daemon status data
323

324
    """
325
    if not success:
326
      if self._ts_last_error is None:
327
        self._ts_last_error = time.time()
328

    
329
      elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
330
        raise _ImportExportError("Too many errors while updating data")
331

    
332
      return False
333

    
334
    self._ts_last_error = None
335

    
336
    return self._SetDaemonData(data)
337

    
338
  def CheckListening(self):
339
    """Checks whether the daemon is listening.
340

341
    """
342
    raise NotImplementedError()
343

    
344
  def _GetConnectedCheckEpoch(self):
345
    """Returns timeout to calculate connect timeout.
346

347
    """
348
    raise NotImplementedError()
349

    
350
  def CheckConnected(self):
351
    """Checks whether the daemon is connected.
352

353
    @rtype: bool
354
    @return: Whether the daemon is connected
355

356
    """
357
    assert self._daemon, "Daemon status missing"
358

    
359
    if self._ts_connected is not None:
360
      return True
361

    
362
    if self._daemon.connected:
363
      self._ts_connected = time.time()
364

    
365
      # TODO: Log remote peer
366
      logging.debug("%s %r on %s is now connected",
367
                    self.MODE_TEXT, self._daemon_name, self.node_name)
368

    
369
      self._cbs.ReportConnected(self, self._private)
370

    
371
      return True
372

    
373
    if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
374
      raise _ImportExportError("Not connected after %s seconds" %
375
                               self._timeouts.connect)
376

    
377
    return False
378

    
379
  def _CheckProgress(self):
380
    """Checks whether a progress update should be reported.
381

382
    """
383
    if ((self._ts_last_progress is None or
384
         _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
385
        self._daemon and
386
        self._daemon.progress_mbytes is not None and
387
        self._daemon.progress_throughput is not None):
388
      self._cbs.ReportProgress(self, self._private)
389
      self._ts_last_progress = time.time()
390

    
391
  def CheckFinished(self):
392
    """Checks whether the daemon exited.
393

394
    @rtype: bool
395
    @return: Whether the transfer is finished
396

397
    """
398
    assert self._daemon, "Daemon status missing"
399

    
400
    if self._ts_finished:
401
      return True
402

    
403
    if self._daemon.exit_status is None:
404
      # TODO: Adjust delay for ETA expiring soon
405
      self._CheckProgress()
406
      return False
407

    
408
    self._ts_finished = time.time()
409

    
410
    self._ReportFinished(self._daemon.exit_status == 0,
411
                         self._daemon.error_message)
412

    
413
    return True
414

    
415
  def _ReportFinished(self, success, message):
416
    """Transfer is finished or daemon exited.
417

418
    @type success: bool
419
    @param success: Whether the transfer was successful
420
    @type message: string
421
    @param message: Error message
422

423
    """
424
    assert self.success is None
425

    
426
    self.success = success
427
    self.final_message = message
428

    
429
    if success:
430
      logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
431
                   self.node_name)
432
    elif self._daemon_name:
433
      self._lu.LogWarning("%s %r on %s failed: %s",
434
                          self.MODE_TEXT, self._daemon_name, self.node_name,
435
                          message)
436
    else:
437
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
438
                          self.node_name, message)
439

    
440
    self._cbs.ReportFinished(self, self._private)
441

    
442
  def _Finalize(self):
443
    """Makes the RPC call to finalize this import/export.
444

445
    """
446
    return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
447

    
448
  def Finalize(self, error=None):
449
    """Finalizes this import/export.
450

451
    """
452
    if self._daemon_name:
453
      logging.info("Finalizing %s %r on %s",
454
                   self.MODE_TEXT, self._daemon_name, self.node_name)
455

    
456
      result = self._Finalize()
457
      if result.fail_msg:
458
        self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
459
                            self.MODE_TEXT, self._daemon_name,
460
                            self.node_name, result.fail_msg)
461
        return False
462

    
463
      # Daemon is no longer running
464
      self._daemon_name = None
465
      self._ts_cleanup = time.time()
466

    
467
    if error:
468
      self._ReportFinished(False, error)
469

    
470
    return True
471

    
472

    
473
class DiskImport(_DiskImportExportBase):
474
  MODE_TEXT = "import"
475

    
476
  def __init__(self, lu, node_name, opts, instance,
477
               dest, dest_args, timeouts, cbs, private=None):
478
    """Initializes this class.
479

480
    @param lu: Logical unit instance
481
    @type node_name: string
482
    @param node_name: Node name for import
483
    @type opts: L{objects.ImportExportOptions}
484
    @param opts: Import/export daemon options
485
    @type instance: L{objects.Instance}
486
    @param instance: Instance object
487
    @param dest: I/O destination
488
    @param dest_args: I/O arguments
489
    @type timeouts: L{ImportExportTimeouts}
490
    @param timeouts: Timeouts for this import
491
    @type cbs: L{ImportExportCbBase}
492
    @param cbs: Callbacks
493
    @param private: Private data for callback functions
494

495
    """
496
    _DiskImportExportBase.__init__(self, lu, node_name, opts,
497
                                   instance, timeouts, cbs, private)
498
    self._dest = dest
499
    self._dest_args = dest_args
500

    
501
    # Timestamps
502
    self._ts_listening = None
503

    
504
  @property
505
  def listen_port(self):
506
    """Returns the port the daemon is listening on.
507

508
    """
509
    if self._daemon:
510
      return self._daemon.listen_port
511

    
512
    return None
513

    
514
  def _StartDaemon(self):
515
    """Starts the import daemon.
516

517
    """
518
    return self._lu.rpc.call_import_start(self.node_name, self._opts,
519
                                          self._instance,
520
                                          self._dest, self._dest_args)
521

    
522
  def CheckListening(self):
523
    """Checks whether the daemon is listening.
524

525
    @rtype: bool
526
    @return: Whether the daemon is listening
527

528
    """
529
    assert self._daemon, "Daemon status missing"
530

    
531
    if self._ts_listening is not None:
532
      return True
533

    
534
    port = self._daemon.listen_port
535
    if port is not None:
536
      self._ts_listening = time.time()
537

    
538
      logging.debug("Import %r on %s is now listening on port %s",
539
                    self._daemon_name, self.node_name, port)
540

    
541
      self._cbs.ReportListening(self, self._private)
542

    
543
      return True
544

    
545
    if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
546
      raise _ImportExportError("Not listening after %s seconds" %
547
                               self._timeouts.listen)
548

    
549
    return False
550

    
551
  def _GetConnectedCheckEpoch(self):
552
    """Returns the time since we started listening.
553

554
    """
555
    assert self._ts_listening is not None, \
556
           ("Checking whether an import is connected is only useful"
557
            " once it's been listening")
558

    
559
    return self._ts_listening
560

    
561

    
562
class DiskExport(_DiskImportExportBase):
563
  MODE_TEXT = "export"
564

    
565
  def __init__(self, lu, node_name, opts,
566
               dest_host, dest_port, instance, source, source_args,
567
               timeouts, cbs, private=None):
568
    """Initializes this class.
569

570
    @param lu: Logical unit instance
571
    @type node_name: string
572
    @param node_name: Node name for import
573
    @type opts: L{objects.ImportExportOptions}
574
    @param opts: Import/export daemon options
575
    @type dest_host: string
576
    @param dest_host: Destination host name or IP address
577
    @type dest_port: number
578
    @param dest_port: Destination port number
579
    @type instance: L{objects.Instance}
580
    @param instance: Instance object
581
    @param source: I/O source
582
    @param source_args: I/O source
583
    @type timeouts: L{ImportExportTimeouts}
584
    @param timeouts: Timeouts for this import
585
    @type cbs: L{ImportExportCbBase}
586
    @param cbs: Callbacks
587
    @param private: Private data for callback functions
588

589
    """
590
    _DiskImportExportBase.__init__(self, lu, node_name, opts,
591
                                   instance, timeouts, cbs, private)
592
    self._dest_host = dest_host
593
    self._dest_port = dest_port
594
    self._source = source
595
    self._source_args = source_args
596

    
597
  def _StartDaemon(self):
598
    """Starts the export daemon.
599

600
    """
601
    return self._lu.rpc.call_export_start(self.node_name, self._opts,
602
                                          self._dest_host, self._dest_port,
603
                                          self._instance, self._source,
604
                                          self._source_args)
605

    
606
  def CheckListening(self):
607
    """Checks whether the daemon is listening.
608

609
    """
610
    # Only an import can be listening
611
    return True
612

    
613
  def _GetConnectedCheckEpoch(self):
614
    """Returns the time since the daemon started.
615

616
    """
617
    assert self._ts_begin is not None
618

    
619
    return self._ts_begin
620

    
621

    
622
def FormatProgress(progress):
623
  """Formats progress information for user consumption
624

625
  """
626
  (mbytes, throughput, percent, _) = progress
627

    
628
  parts = [
629
    utils.FormatUnit(mbytes, "h"),
630

    
631
    # Not using FormatUnit as it doesn't support kilobytes
632
    "%0.1f MiB/s" % throughput,
633
    ]
634

    
635
  if percent is not None:
636
    parts.append("%d%%" % percent)
637

    
638
  # TODO: Format ETA
639

    
640
  return utils.CommaJoin(parts)
641

    
642

    
643
class ImportExportLoop:
644
  MIN_DELAY = 1.0
645
  MAX_DELAY = 20.0
646

    
647
  def __init__(self, lu):
648
    """Initializes this class.
649

650
    """
651
    self._lu = lu
652
    self._queue = []
653
    self._pending_add = []
654

    
655
  def Add(self, diskie):
656
    """Adds an import/export object to the loop.
657

658
    @type diskie: Subclass of L{_DiskImportExportBase}
659
    @param diskie: Import/export object
660

661
    """
662
    assert diskie not in self._pending_add
663
    assert diskie.loop is None
664

    
665
    diskie.SetLoop(self)
666

    
667
    # Adding new objects to a staging list is necessary, otherwise the main
668
    # loop gets confused if callbacks modify the queue while the main loop is
669
    # iterating over it.
670
    self._pending_add.append(diskie)
671

    
672
  @staticmethod
673
  def _CollectDaemonStatus(lu, daemons):
674
    """Collects the status for all import/export daemons.
675

676
    """
677
    daemon_status = {}
678

    
679
    for node_name, names in daemons.iteritems():
680
      result = lu.rpc.call_impexp_status(node_name, names)
681
      if result.fail_msg:
682
        lu.LogWarning("Failed to get daemon status on %s: %s",
683
                      node_name, result.fail_msg)
684
        continue
685

    
686
      assert len(names) == len(result.payload)
687

    
688
      daemon_status[node_name] = dict(zip(names, result.payload))
689

    
690
    return daemon_status
691

    
692
  @staticmethod
693
  def _GetActiveDaemonNames(queue):
694
    """Gets the names of all active daemons.
695

696
    """
697
    result = {}
698
    for diskie in queue:
699
      if not diskie.active:
700
        continue
701

    
702
      try:
703
        # Start daemon if necessary
704
        daemon_name = diskie.CheckDaemon()
705
      except _ImportExportError, err:
706
        logging.exception("%s failed", diskie.MODE_TEXT)
707
        diskie.Finalize(error=str(err))
708
        continue
709

    
710
      result.setdefault(diskie.node_name, []).append(daemon_name)
711

    
712
    assert len(queue) >= len(result)
713
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
714

    
715
    logging.debug("daemons=%r", result)
716

    
717
    return result
718

    
719
  def _AddPendingToQueue(self):
720
    """Adds all pending import/export objects to the internal queue.
721

722
    """
723
    assert compat.all(diskie not in self._queue and diskie.loop == self
724
                      for diskie in self._pending_add)
725

    
726
    self._queue.extend(self._pending_add)
727

    
728
    del self._pending_add[:]
729

    
730
  def Run(self):
731
    """Utility main loop.
732

733
    """
734
    while True:
735
      self._AddPendingToQueue()
736

    
737
      # Collect all active daemon names
738
      daemons = self._GetActiveDaemonNames(self._queue)
739
      if not daemons:
740
        break
741

    
742
      # Collection daemon status data
743
      data = self._CollectDaemonStatus(self._lu, daemons)
744

    
745
      # Use data
746
      delay = self.MAX_DELAY
747
      for diskie in self._queue:
748
        if not diskie.active:
749
          continue
750

    
751
        try:
752
          try:
753
            all_daemon_data = data[diskie.node_name]
754
          except KeyError:
755
            result = diskie.SetDaemonData(False, None)
756
          else:
757
            result = \
758
              diskie.SetDaemonData(True,
759
                                   all_daemon_data[diskie.GetDaemonName()])
760

    
761
          if not result:
762
            # Daemon not yet ready, retry soon
763
            delay = min(3.0, delay)
764
            continue
765

    
766
          if diskie.CheckFinished():
767
            # Transfer finished
768
            diskie.Finalize()
769
            continue
770

    
771
          # Normal case: check again in 5 seconds
772
          delay = min(5.0, delay)
773

    
774
          if not diskie.CheckListening():
775
            # Not yet listening, retry soon
776
            delay = min(1.0, delay)
777
            continue
778

    
779
          if not diskie.CheckConnected():
780
            # Not yet connected, retry soon
781
            delay = min(1.0, delay)
782
            continue
783

    
784
        except _ImportExportError, err:
785
          logging.exception("%s failed", diskie.MODE_TEXT)
786
          diskie.Finalize(error=str(err))
787

    
788
      if not compat.any([diskie.active for diskie in self._queue]):
789
        break
790

    
791
      # Wait a bit
792
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
793
      logging.debug("Waiting for %ss", delay)
794
      time.sleep(delay)
795

    
796
  def FinalizeAll(self):
797
    """Finalizes all pending transfers.
798

799
    """
800
    success = True
801

    
802
    for diskie in self._queue:
803
      success = diskie.Finalize() and success
804

    
805
    return success
806

    
807

    
808
class _TransferInstCbBase(ImportExportCbBase):
809
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
810
               dest_node, dest_ip, export_opts):
811
    """Initializes this class.
812

813
    """
814
    ImportExportCbBase.__init__(self)
815

    
816
    self.lu = lu
817
    self.feedback_fn = feedback_fn
818
    self.instance = instance
819
    self.timeouts = timeouts
820
    self.src_node = src_node
821
    self.src_cbs = src_cbs
822
    self.dest_node = dest_node
823
    self.dest_ip = dest_ip
824
    self.export_opts = export_opts
825

    
826

    
827
class _TransferInstSourceCb(_TransferInstCbBase):
828
  def ReportConnected(self, ie, dtp):
829
    """Called when a connection has been established.
830

831
    """
832
    assert self.src_cbs is None
833
    assert dtp.src_export == ie
834
    assert dtp.dest_import
835

    
836
    self.feedback_fn("%s is sending data on %s" %
837
                     (dtp.data.name, ie.node_name))
838

    
839
  def ReportProgress(self, ie, dtp):
840
    """Called when new progress information should be reported.
841

842
    """
843
    progress = ie.progress
844
    if not progress:
845
      return
846

    
847
    self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
848

    
849
  def ReportFinished(self, ie, dtp):
850
    """Called when a transfer has finished.
851

852
    """
853
    assert self.src_cbs is None
854
    assert dtp.src_export == ie
855
    assert dtp.dest_import
856

    
857
    if ie.success:
858
      self.feedback_fn("%s finished sending data" % dtp.data.name)
859
    else:
860
      self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
861
                       (dtp.data.name, ie.final_message, ie.recent_output))
862

    
863
    dtp.RecordResult(ie.success)
864

    
865
    cb = dtp.data.finished_fn
866
    if cb:
867
      cb()
868

    
869
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
870
    # give the daemon a moment to sort things out
871
    if dtp.dest_import and not ie.success:
872
      dtp.dest_import.Abort()
873

    
874

    
875
class _TransferInstDestCb(_TransferInstCbBase):
876
  def ReportListening(self, ie, dtp):
877
    """Called when daemon started listening.
878

879
    """
880
    assert self.src_cbs
881
    assert dtp.src_export is None
882
    assert dtp.dest_import
883

    
884
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
885

    
886
    # Start export on source node
887
    de = DiskExport(self.lu, self.src_node, self.export_opts,
888
                    self.dest_ip, ie.listen_port,
889
                    self.instance, dtp.data.src_io, dtp.data.src_ioargs,
890
                    self.timeouts, self.src_cbs, private=dtp)
891
    ie.loop.Add(de)
892

    
893
    dtp.src_export = de
894

    
895
  def ReportConnected(self, ie, dtp):
896
    """Called when a connection has been established.
897

898
    """
899
    self.feedback_fn("%s is receiving data on %s" %
900
                     (dtp.data.name, self.dest_node))
901

    
902
  def ReportFinished(self, ie, dtp):
903
    """Called when a transfer has finished.
904

905
    """
906
    if ie.success:
907
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
908
    else:
909
      self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
910
                       (dtp.data.name, ie.final_message, ie.recent_output))
911

    
912
    dtp.RecordResult(ie.success)
913

    
914
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
915
    # give the daemon a moment to sort things out
916
    if dtp.src_export and not ie.success:
917
      dtp.src_export.Abort()
918

    
919

    
920
class DiskTransfer(object):
921
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
922
               finished_fn):
923
    """Initializes this class.
924

925
    @type name: string
926
    @param name: User-visible name for this transfer (e.g. "disk/0")
927
    @param src_io: Source I/O type
928
    @param src_ioargs: Source I/O arguments
929
    @param dest_io: Destination I/O type
930
    @param dest_ioargs: Destination I/O arguments
931
    @type finished_fn: callable
932
    @param finished_fn: Function called once transfer has finished
933

934
    """
935
    self.name = name
936

    
937
    self.src_io = src_io
938
    self.src_ioargs = src_ioargs
939

    
940
    self.dest_io = dest_io
941
    self.dest_ioargs = dest_ioargs
942

    
943
    self.finished_fn = finished_fn
944

    
945

    
946
class _DiskTransferPrivate(object):
947
  def __init__(self, data, success):
948
    """Initializes this class.
949

950
    @type data: L{DiskTransfer}
951
    @type success: bool
952

953
    """
954
    self.data = data
955

    
956
    self.src_export = None
957
    self.dest_import = None
958

    
959
    self.success = success
960

    
961
  def RecordResult(self, success):
962
    """Updates the status.
963

964
    One failed part will cause the whole transfer to fail.
965

966
    """
967
    self.success = self.success and success
968

    
969

    
970
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
971
                         instance, all_transfers):
972
  """Transfers an instance's data from one node to another.
973

974
  @param lu: Logical unit instance
975
  @param feedback_fn: Feedback function
976
  @type src_node: string
977
  @param src_node: Source node name
978
  @type dest_node: string
979
  @param dest_node: Destination node name
980
  @type dest_ip: string
981
  @param dest_ip: IP address of destination node
982
  @type instance: L{objects.Instance}
983
  @param instance: Instance object
984
  @type all_transfers: list of L{DiskTransfer} instances
985
  @param all_transfers: List of all disk transfers to be made
986
  @rtype: list
987
  @return: List with a boolean (True=successful, False=failed) for success for
988
           each transfer
989

990
  """
991
  # Compress only if transfer is to another node
992
  if src_node == dest_node:
993
    compress = constants.IEC_NONE
994
  else:
995
    compress = constants.IEC_GZIP
996

    
997
  logging.debug("Source node %s, destination node %s, compression '%s'",
998
                src_node, dest_node, compress)
999

    
1000
  opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1001
                                     compress=compress)
1002

    
1003
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1004
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1005
                                  src_node, None, dest_node, dest_ip, opts)
1006
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1007
                                 src_node, src_cbs, dest_node, dest_ip, opts)
1008

    
1009
  all_dtp = []
1010

    
1011
  ieloop = ImportExportLoop(lu)
1012
  try:
1013
    for transfer in all_transfers:
1014
      if transfer:
1015
        feedback_fn("Exporting %s from %s to %s" %
1016
                    (transfer.name, src_node, dest_node))
1017

    
1018
        dtp = _DiskTransferPrivate(transfer, True)
1019

    
1020
        di = DiskImport(lu, dest_node, opts, instance,
1021
                        transfer.dest_io, transfer.dest_ioargs,
1022
                        timeouts, dest_cbs, private=dtp)
1023
        ieloop.Add(di)
1024

    
1025
        dtp.dest_import = di
1026
      else:
1027
        dtp = _DiskTransferPrivate(None, False)
1028

    
1029
      all_dtp.append(dtp)
1030

    
1031
    ieloop.Run()
1032
  finally:
1033
    ieloop.FinalizeAll()
1034

    
1035
  assert len(all_dtp) == len(all_transfers)
1036
  assert compat.all([(dtp.src_export is None or
1037
                      dtp.src_export.success is not None) and
1038
                     (dtp.dest_import is None or
1039
                      dtp.dest_import.success is not None)
1040
                     for dtp in all_dtp]), \
1041
         "Not all imports/exports are finalized"
1042

    
1043
  return [bool(dtp.success) for dtp in all_dtp]
1044

    
1045

    
1046
class _RemoteExportCb(ImportExportCbBase):
1047
  def __init__(self, feedback_fn, disk_count):
1048
    """Initializes this class.
1049

1050
    """
1051
    ImportExportCbBase.__init__(self)
1052
    self._feedback_fn = feedback_fn
1053
    self._dresults = [None] * disk_count
1054

    
1055
  @property
1056
  def disk_results(self):
1057
    """Returns per-disk results.
1058

1059
    """
1060
    return self._dresults
1061

    
1062
  def ReportConnected(self, ie, private):
1063
    """Called when a connection has been established.
1064

1065
    """
1066
    (idx, _) = private
1067

    
1068
    self._feedback_fn("Disk %s is now sending data" % idx)
1069

    
1070
  def ReportProgress(self, ie, private):
1071
    """Called when new progress information should be reported.
1072

1073
    """
1074
    (idx, _) = private
1075

    
1076
    progress = ie.progress
1077
    if not progress:
1078
      return
1079

    
1080
    self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1081

    
1082
  def ReportFinished(self, ie, private):
1083
    """Called when a transfer has finished.
1084

1085
    """
1086
    (idx, finished_fn) = private
1087

    
1088
    if ie.success:
1089
      self._feedback_fn("Disk %s finished sending data" % idx)
1090
    else:
1091
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" %
1092
                        (idx, ie.final_message, ie.recent_output))
1093

    
1094
    self._dresults[idx] = bool(ie.success)
1095

    
1096
    if finished_fn:
1097
      finished_fn()
1098

    
1099

    
1100
class ExportInstanceHelper:
1101
  def __init__(self, lu, feedback_fn, instance):
1102
    """Initializes this class.
1103

1104
    @param lu: Logical unit instance
1105
    @param feedback_fn: Feedback function
1106
    @type instance: L{objects.Instance}
1107
    @param instance: Instance object
1108

1109
    """
1110
    self._lu = lu
1111
    self._feedback_fn = feedback_fn
1112
    self._instance = instance
1113

    
1114
    self._snap_disks = []
1115
    self._removed_snaps = [False] * len(instance.disks)
1116

    
1117
  def CreateSnapshots(self):
1118
    """Creates an LVM snapshot for every disk of the instance.
1119

1120
    """
1121
    assert not self._snap_disks
1122

    
1123
    instance = self._instance
1124
    src_node = instance.primary_node
1125

    
1126
    vgname = self._lu.cfg.GetVGName()
1127

    
1128
    for idx, disk in enumerate(instance.disks):
1129
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1130
                        (idx, src_node))
1131

    
1132
      # result.payload will be a snapshot of an lvm leaf of the one we
1133
      # passed
1134
      result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1135
      msg = result.fail_msg
1136
      if msg:
1137
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1138
                            idx, src_node, msg)
1139
        new_dev = False
1140
      else:
1141
        disk_id = (vgname, result.payload)
1142
        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1143
                               logical_id=disk_id, physical_id=disk_id,
1144
                               iv_name=disk.iv_name)
1145

    
1146
      self._snap_disks.append(new_dev)
1147

    
1148
    assert len(self._snap_disks) == len(instance.disks)
1149
    assert len(self._removed_snaps) == len(instance.disks)
1150

    
1151
  def _RemoveSnapshot(self, disk_index):
1152
    """Removes an LVM snapshot.
1153

1154
    @type disk_index: number
1155
    @param disk_index: Index of the snapshot to be removed
1156

1157
    """
1158
    disk = self._snap_disks[disk_index]
1159
    if disk and not self._removed_snaps[disk_index]:
1160
      src_node = self._instance.primary_node
1161

    
1162
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1163
                        (disk_index, src_node))
1164

    
1165
      result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1166
      if result.fail_msg:
1167
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1168
                            " %s: %s", disk_index, src_node, result.fail_msg)
1169
      else:
1170
        self._removed_snaps[disk_index] = True
1171

    
1172
  def LocalExport(self, dest_node):
1173
    """Intra-cluster instance export.
1174

1175
    @type dest_node: L{objects.Node}
1176
    @param dest_node: Destination node
1177

1178
    """
1179
    instance = self._instance
1180
    src_node = instance.primary_node
1181

    
1182
    assert len(self._snap_disks) == len(instance.disks)
1183

    
1184
    transfers = []
1185

    
1186
    for idx, dev in enumerate(self._snap_disks):
1187
      if not dev:
1188
        transfers.append(None)
1189
        continue
1190

    
1191
      path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1192
                            dev.physical_id[1])
1193

    
1194
      finished_fn = compat.partial(self._TransferFinished, idx)
1195

    
1196
      # FIXME: pass debug option from opcode to backend
1197
      dt = DiskTransfer("snapshot/%s" % idx,
1198
                        constants.IEIO_SCRIPT, (dev, idx),
1199
                        constants.IEIO_FILE, (path, ),
1200
                        finished_fn)
1201
      transfers.append(dt)
1202

    
1203
    # Actually export data
1204
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1205
                                    src_node, dest_node.name,
1206
                                    dest_node.secondary_ip,
1207
                                    instance, transfers)
1208

    
1209
    assert len(dresults) == len(instance.disks)
1210

    
1211
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1212
    result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1213
                                               self._snap_disks)
1214
    msg = result.fail_msg
1215
    fin_resu = not msg
1216
    if msg:
1217
      self._lu.LogWarning("Could not finalize export for instance %s"
1218
                          " on node %s: %s", instance.name, dest_node.name, msg)
1219

    
1220
    return (fin_resu, dresults)
1221

    
1222
  def RemoteExport(self, opts, disk_info, timeouts):
1223
    """Inter-cluster instance export.
1224

1225
    @type opts: L{objects.ImportExportOptions}
1226
    @param opts: Import/export daemon options
1227
    @type disk_info: list
1228
    @param disk_info: Per-disk destination information
1229
    @type timeouts: L{ImportExportTimeouts}
1230
    @param timeouts: Timeouts for this import
1231

1232
    """
1233
    instance = self._instance
1234

    
1235
    assert len(disk_info) == len(instance.disks)
1236

    
1237
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1238

    
1239
    ieloop = ImportExportLoop(self._lu)
1240
    try:
1241
      for idx, (dev, (host, port, _, _)) in enumerate(zip(instance.disks,
1242
                                                          disk_info)):
1243
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1244
        finished_fn = compat.partial(self._TransferFinished, idx)
1245
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1246
                              opts, host, port, instance,
1247
                              constants.IEIO_SCRIPT, (dev, idx),
1248
                              timeouts, cbs, private=(idx, finished_fn)))
1249

    
1250
      ieloop.Run()
1251
    finally:
1252
      ieloop.FinalizeAll()
1253

    
1254
    return (True, cbs.disk_results)
1255

    
1256
  def _TransferFinished(self, idx):
1257
    """Called once a transfer has finished.
1258

1259
    @type idx: number
1260
    @param idx: Disk index
1261

1262
    """
1263
    logging.debug("Transfer %s finished", idx)
1264
    self._RemoveSnapshot(idx)
1265

    
1266
  def Cleanup(self):
1267
    """Remove all snapshots.
1268

1269
    """
1270
    assert len(self._removed_snaps) == len(self._instance.disks)
1271
    for idx in range(len(self._instance.disks)):
1272
      self._RemoveSnapshot(idx)
1273

    
1274

    
1275
class _RemoteImportCb(ImportExportCbBase):
1276
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1277
               external_address):
1278
    """Initializes this class.
1279

1280
    @type cds: string
1281
    @param cds: Cluster domain secret
1282
    @type x509_cert_pem: string
1283
    @param x509_cert_pem: CA used for signing import key
1284
    @type disk_count: number
1285
    @param disk_count: Number of disks
1286
    @type external_address: string
1287
    @param external_address: External address of destination node
1288

1289
    """
1290
    ImportExportCbBase.__init__(self)
1291
    self._feedback_fn = feedback_fn
1292
    self._cds = cds
1293
    self._x509_cert_pem = x509_cert_pem
1294
    self._disk_count = disk_count
1295
    self._external_address = external_address
1296

    
1297
    self._dresults = [None] * disk_count
1298
    self._daemon_port = [None] * disk_count
1299

    
1300
    self._salt = utils.GenerateSecret(8)
1301

    
1302
  @property
1303
  def disk_results(self):
1304
    """Returns per-disk results.
1305

1306
    """
1307
    return self._dresults
1308

    
1309
  def _CheckAllListening(self):
1310
    """Checks whether all daemons are listening.
1311

1312
    If all daemons are listening, the information is sent to the client.
1313

1314
    """
1315
    if not compat.all(dp is not None for dp in self._daemon_port):
1316
      return
1317

    
1318
    host = self._external_address
1319

    
1320
    disks = []
1321
    for idx, port in enumerate(self._daemon_port):
1322
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1323
                                               idx, host, port))
1324

    
1325
    assert len(disks) == self._disk_count
1326

    
1327
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1328
      "disks": disks,
1329
      "x509_ca": self._x509_cert_pem,
1330
      })
1331

    
1332
  def ReportListening(self, ie, private):
1333
    """Called when daemon started listening.
1334

1335
    """
1336
    (idx, ) = private
1337

    
1338
    self._feedback_fn("Disk %s is now listening" % idx)
1339

    
1340
    assert self._daemon_port[idx] is None
1341

    
1342
    self._daemon_port[idx] = ie.listen_port
1343

    
1344
    self._CheckAllListening()
1345

    
1346
  def ReportConnected(self, ie, private):
1347
    """Called when a connection has been established.
1348

1349
    """
1350
    (idx, ) = private
1351

    
1352
    self._feedback_fn("Disk %s is now receiving data" % idx)
1353

    
1354
  def ReportFinished(self, ie, private):
1355
    """Called when a transfer has finished.
1356

1357
    """
1358
    (idx, ) = private
1359

    
1360
    # Daemon is certainly no longer listening
1361
    self._daemon_port[idx] = None
1362

    
1363
    if ie.success:
1364
      self._feedback_fn("Disk %s finished receiving data" % idx)
1365
    else:
1366
      self._feedback_fn(("Disk %s failed to receive data: %s"
1367
                         " (recent output: %r)") %
1368
                        (idx, ie.final_message, ie.recent_output))
1369

    
1370
    self._dresults[idx] = bool(ie.success)
1371

    
1372

    
1373
def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts):
1374
  """Imports an instance from another cluster.
1375

1376
  @param lu: Logical unit instance
1377
  @param feedback_fn: Feedback function
1378
  @type instance: L{objects.Instance}
1379
  @param instance: Instance object
1380
  @type source_x509_ca: OpenSSL.crypto.X509
1381
  @param source_x509_ca: Import source's X509 CA
1382
  @type cds: string
1383
  @param cds: Cluster domain secret
1384
  @type timeouts: L{ImportExportTimeouts}
1385
  @param timeouts: Timeouts for this import
1386

1387
  """
1388
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1389
                                                  source_x509_ca)
1390

    
1391
  # Create crypto key
1392
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1393
                                        constants.RIE_CERT_VALIDITY)
1394
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1395

    
1396
  (x509_key_name, x509_cert_pem) = result.payload
1397
  try:
1398
    # Load certificate
1399
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1400
                                                x509_cert_pem)
1401

    
1402
    # Import daemon options
1403
    opts = objects.ImportExportOptions(key_name=x509_key_name,
1404
                                       ca_pem=source_ca_pem)
1405

    
1406
    # Sign certificate
1407
    signed_x509_cert_pem = \
1408
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1409

    
1410
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1411
                          len(instance.disks), instance.primary_node)
1412

    
1413
    ieloop = ImportExportLoop(lu)
1414
    try:
1415
      for idx, dev in enumerate(instance.disks):
1416
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1417
                              constants.IEIO_SCRIPT, (dev, idx),
1418
                              timeouts, cbs, private=(idx, )))
1419

    
1420
      ieloop.Run()
1421
    finally:
1422
      ieloop.FinalizeAll()
1423
  finally:
1424
    # Remove crypto key and certificate
1425
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1426
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1427

    
1428
  return cbs.disk_results
1429

    
1430

    
1431
def _GetImportExportHandshakeMessage(version):
1432
  """Returns the handshake message for a RIE protocol version.
1433

1434
  @type version: number
1435

1436
  """
1437
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1438

    
1439

    
1440
def ComputeRemoteExportHandshake(cds):
1441
  """Computes the remote import/export handshake.
1442

1443
  @type cds: string
1444
  @param cds: Cluster domain secret
1445

1446
  """
1447
  salt = utils.GenerateSecret(8)
1448
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1449
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1450

    
1451

    
1452
def CheckRemoteExportHandshake(cds, handshake):
1453
  """Checks the handshake of a remote import/export.
1454

1455
  @type cds: string
1456
  @param cds: Cluster domain secret
1457
  @type handshake: sequence
1458
  @param handshake: Handshake sent by remote peer
1459

1460
  """
1461
  try:
1462
    (version, hmac_digest, hmac_salt) = handshake
1463
  except (TypeError, ValueError), err:
1464
    return "Invalid data: %s" % err
1465

    
1466
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1467
                              hmac_digest, salt=hmac_salt):
1468
    return "Hash didn't match, clusters don't share the same domain secret"
1469

    
1470
  if version != constants.RIE_VERSION:
1471
    return ("Clusters don't have the same remote import/export protocol"
1472
            " (local=%s, remote=%s)" %
1473
            (constants.RIE_VERSION, version))
1474

    
1475
  return None
1476

    
1477

    
1478
def _GetRieDiskInfoMessage(disk_index, host, port):
1479
  """Returns the hashed text for import/export disk information.
1480

1481
  @type disk_index: number
1482
  @param disk_index: Index of disk (included in hash)
1483
  @type host: string
1484
  @param host: Hostname
1485
  @type port: number
1486
  @param port: Daemon port
1487

1488
  """
1489
  return "%s:%s:%s" % (disk_index, host, port)
1490

    
1491

    
1492
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1493
  """Verifies received disk information for an export.
1494

1495
  @type cds: string
1496
  @param cds: Cluster domain secret
1497
  @type disk_index: number
1498
  @param disk_index: Index of disk (included in hash)
1499
  @type disk_info: sequence
1500
  @param disk_info: Disk information sent by remote peer
1501

1502
  """
1503
  try:
1504
    (host, port, hmac_digest, hmac_salt) = disk_info
1505
  except (TypeError, ValueError), err:
1506
    raise errors.GenericError("Invalid data: %s" % err)
1507

    
1508
  if not (host and port):
1509
    raise errors.GenericError("Missing destination host or port")
1510

    
1511
  msg = _GetRieDiskInfoMessage(disk_index, host, port)
1512

    
1513
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1514
    raise errors.GenericError("HMAC is wrong")
1515

    
1516
  return (host, port)
1517

    
1518

    
1519
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port):
1520
  """Computes the signed disk information for a remote import.
1521

1522
  @type cds: string
1523
  @param cds: Cluster domain secret
1524
  @type salt: string
1525
  @param salt: HMAC salt
1526
  @type disk_index: number
1527
  @param disk_index: Index of disk (included in hash)
1528
  @type host: string
1529
  @param host: Hostname
1530
  @type port: number
1531
  @param port: Daemon port
1532

1533
  """
1534
  msg = _GetRieDiskInfoMessage(disk_index, host, port)
1535
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1536
  return (host, port, hmac_digest, salt)