Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ af1d39b1

History | View | Annotate | Download (42.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Instance-related functions and classes for masterd.
23

24
"""
25

    
26
import logging
27
import time
28
import OpenSSL
29

    
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import compat
33
from ganeti import utils
34
from ganeti import objects
35

    
36

    
37
class _ImportExportError(Exception):
38
  """Local exception to report import/export errors.
39

40
  """
41

    
42

    
43
class ImportExportTimeouts(object):
44
  #: Time until daemon starts writing status file
45
  DEFAULT_READY_TIMEOUT = 10
46

    
47
  #: Length of time until errors cause hard failure
48
  DEFAULT_ERROR_TIMEOUT = 10
49

    
50
  #: Time after which daemon must be listening
51
  DEFAULT_LISTEN_TIMEOUT = 10
52

    
53
  #: Progress update interval
54
  DEFAULT_PROGRESS_INTERVAL = 60
55

    
56
  __slots__ = [
57
    "error",
58
    "ready",
59
    "listen",
60
    "connect",
61
    "progress",
62
    ]
63

    
64
  def __init__(self, connect,
65
               listen=DEFAULT_LISTEN_TIMEOUT,
66
               error=DEFAULT_ERROR_TIMEOUT,
67
               ready=DEFAULT_READY_TIMEOUT,
68
               progress=DEFAULT_PROGRESS_INTERVAL):
69
    """Initializes this class.
70

71
    @type connect: number
72
    @param connect: Timeout for establishing connection
73
    @type listen: number
74
    @param listen: Timeout for starting to listen for connections
75
    @type error: number
76
    @param error: Length of time until errors cause hard failure
77
    @type ready: number
78
    @param ready: Timeout for daemon to become ready
79
    @type progress: number
80
    @param progress: Progress update interval
81

82
    """
83
    self.error = error
84
    self.ready = ready
85
    self.listen = listen
86
    self.connect = connect
87
    self.progress = progress
88

    
89

    
90
class ImportExportCbBase(object):
91
  """Callbacks for disk import/export.
92

93
  """
94
  def ReportListening(self, ie, private):
95
    """Called when daemon started listening.
96

97
    @type ie: Subclass of L{_DiskImportExportBase}
98
    @param ie: Import/export object
99
    @param private: Private data passed to import/export object
100

101
    """
102

    
103
  def ReportConnected(self, ie, private):
104
    """Called when a connection has been established.
105

106
    @type ie: Subclass of L{_DiskImportExportBase}
107
    @param ie: Import/export object
108
    @param private: Private data passed to import/export object
109

110
    """
111

    
112
  def ReportProgress(self, ie, private):
113
    """Called when new progress information should be reported.
114

115
    @type ie: Subclass of L{_DiskImportExportBase}
116
    @param ie: Import/export object
117
    @param private: Private data passed to import/export object
118

119
    """
120

    
121
  def ReportFinished(self, ie, private):
122
    """Called when a transfer has finished.
123

124
    @type ie: Subclass of L{_DiskImportExportBase}
125
    @param ie: Import/export object
126
    @param private: Private data passed to import/export object
127

128
    """
129

    
130

    
131
def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
132
  """Checks whether a timeout has expired.
133

134
  """
135
  return _time_fn() > (epoch + timeout)
136

    
137

    
138
class _DiskImportExportBase(object):
139
  MODE_TEXT = None
140

    
141
  def __init__(self, lu, node_name, opts,
142
               instance, timeouts, cbs, private=None):
143
    """Initializes this class.
144

145
    @param lu: Logical unit instance
146
    @type node_name: string
147
    @param node_name: Node name for import
148
    @type opts: L{objects.ImportExportOptions}
149
    @param opts: Import/export daemon options
150
    @type instance: L{objects.Instance}
151
    @param instance: Instance object
152
    @type timeouts: L{ImportExportTimeouts}
153
    @param timeouts: Timeouts for this import
154
    @type cbs: L{ImportExportCbBase}
155
    @param cbs: Callbacks
156
    @param private: Private data for callback functions
157

158
    """
159
    assert self.MODE_TEXT
160

    
161
    self._lu = lu
162
    self.node_name = node_name
163
    self._opts = opts
164
    self._instance = instance
165
    self._timeouts = timeouts
166
    self._cbs = cbs
167
    self._private = private
168

    
169
    # Parent loop
170
    self._loop = None
171

    
172
    # Timestamps
173
    self._ts_begin = None
174
    self._ts_connected = None
175
    self._ts_finished = None
176
    self._ts_cleanup = None
177
    self._ts_last_progress = None
178
    self._ts_last_error = None
179

    
180
    # Transfer status
181
    self.success = None
182
    self.final_message = None
183

    
184
    # Daemon status
185
    self._daemon_name = None
186
    self._daemon = None
187

    
188
  @property
189
  def recent_output(self):
190
    """Returns the most recent output from the daemon.
191

192
    """
193
    if self._daemon:
194
      return self._daemon.recent_output
195

    
196
    return None
197

    
198
  @property
199
  def progress(self):
200
    """Returns transfer progress information.
201

202
    """
203
    if not self._daemon:
204
      return None
205

    
206
    return (self._daemon.progress_mbytes,
207
            self._daemon.progress_throughput,
208
            self._daemon.progress_percent,
209
            self._daemon.progress_eta)
210

    
211
  @property
212
  def magic(self):
213
    """Returns the magic value for this import/export.
214

215
    """
216
    return self._opts.magic
217

    
218
  @property
219
  def active(self):
220
    """Determines whether this transport is still active.
221

222
    """
223
    return self.success is None
224

    
225
  @property
226
  def loop(self):
227
    """Returns parent loop.
228

229
    @rtype: L{ImportExportLoop}
230

231
    """
232
    return self._loop
233

    
234
  def SetLoop(self, loop):
235
    """Sets the parent loop.
236

237
    @type loop: L{ImportExportLoop}
238

239
    """
240
    if self._loop:
241
      raise errors.ProgrammerError("Loop can only be set once")
242

    
243
    self._loop = loop
244

    
245
  def _StartDaemon(self):
246
    """Starts the import/export daemon.
247

248
    """
249
    raise NotImplementedError()
250

    
251
  def CheckDaemon(self):
252
    """Checks whether daemon has been started and if not, starts it.
253

254
    @rtype: string
255
    @return: Daemon name
256

257
    """
258
    assert self._ts_cleanup is None
259

    
260
    if self._daemon_name is None:
261
      assert self._ts_begin is None
262

    
263
      result = self._StartDaemon()
264
      if result.fail_msg:
265
        raise _ImportExportError("Failed to start %s on %s: %s" %
266
                                 (self.MODE_TEXT, self.node_name,
267
                                  result.fail_msg))
268

    
269
      daemon_name = result.payload
270

    
271
      logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
272
                   self.node_name)
273

    
274
      self._ts_begin = time.time()
275
      self._daemon_name = daemon_name
276

    
277
    return self._daemon_name
278

    
279
  def GetDaemonName(self):
280
    """Returns the daemon name.
281

282
    """
283
    assert self._daemon_name, "Daemon has not been started"
284
    assert self._ts_cleanup is None
285
    return self._daemon_name
286

    
287
  def Abort(self):
288
    """Sends SIGTERM to import/export daemon (if still active).
289

290
    """
291
    if self._daemon_name:
292
      self._lu.LogWarning("Aborting %s %r on %s",
293
                          self.MODE_TEXT, self._daemon_name, self.node_name)
294
      result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
295
      if result.fail_msg:
296
        self._lu.LogWarning("Failed to abort %s %r on %s: %s",
297
                            self.MODE_TEXT, self._daemon_name,
298
                            self.node_name, result.fail_msg)
299
        return False
300

    
301
    return True
302

    
303
  def _SetDaemonData(self, data):
304
    """Internal function for updating status daemon data.
305

306
    @type data: L{objects.ImportExportStatus}
307
    @param data: Daemon status data
308

309
    """
310
    assert self._ts_begin is not None
311

    
312
    if not data:
313
      if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
314
        raise _ImportExportError("Didn't become ready after %s seconds" %
315
                                 self._timeouts.ready)
316

    
317
      return False
318

    
319
    self._daemon = data
320

    
321
    return True
322

    
323
  def SetDaemonData(self, success, data):
324
    """Updates daemon status data.
325

326
    @type success: bool
327
    @param success: Whether fetching data was successful or not
328
    @type data: L{objects.ImportExportStatus}
329
    @param data: Daemon status data
330

331
    """
332
    if not success:
333
      if self._ts_last_error is None:
334
        self._ts_last_error = time.time()
335

    
336
      elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
337
        raise _ImportExportError("Too many errors while updating data")
338

    
339
      return False
340

    
341
    self._ts_last_error = None
342

    
343
    return self._SetDaemonData(data)
344

    
345
  def CheckListening(self):
346
    """Checks whether the daemon is listening.
347

348
    """
349
    raise NotImplementedError()
350

    
351
  def _GetConnectedCheckEpoch(self):
352
    """Returns timeout to calculate connect timeout.
353

354
    """
355
    raise NotImplementedError()
356

    
357
  def CheckConnected(self):
358
    """Checks whether the daemon is connected.
359

360
    @rtype: bool
361
    @return: Whether the daemon is connected
362

363
    """
364
    assert self._daemon, "Daemon status missing"
365

    
366
    if self._ts_connected is not None:
367
      return True
368

    
369
    if self._daemon.connected:
370
      self._ts_connected = time.time()
371

    
372
      # TODO: Log remote peer
373
      logging.debug("%s %r on %s is now connected",
374
                    self.MODE_TEXT, self._daemon_name, self.node_name)
375

    
376
      self._cbs.ReportConnected(self, self._private)
377

    
378
      return True
379

    
380
    if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
381
      raise _ImportExportError("Not connected after %s seconds" %
382
                               self._timeouts.connect)
383

    
384
    return False
385

    
386
  def _CheckProgress(self):
387
    """Checks whether a progress update should be reported.
388

389
    """
390
    if ((self._ts_last_progress is None or
391
         _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
392
        self._daemon and
393
        self._daemon.progress_mbytes is not None and
394
        self._daemon.progress_throughput is not None):
395
      self._cbs.ReportProgress(self, self._private)
396
      self._ts_last_progress = time.time()
397

    
398
  def CheckFinished(self):
399
    """Checks whether the daemon exited.
400

401
    @rtype: bool
402
    @return: Whether the transfer is finished
403

404
    """
405
    assert self._daemon, "Daemon status missing"
406

    
407
    if self._ts_finished:
408
      return True
409

    
410
    if self._daemon.exit_status is None:
411
      # TODO: Adjust delay for ETA expiring soon
412
      self._CheckProgress()
413
      return False
414

    
415
    self._ts_finished = time.time()
416

    
417
    self._ReportFinished(self._daemon.exit_status == 0,
418
                         self._daemon.error_message)
419

    
420
    return True
421

    
422
  def _ReportFinished(self, success, message):
423
    """Transfer is finished or daemon exited.
424

425
    @type success: bool
426
    @param success: Whether the transfer was successful
427
    @type message: string
428
    @param message: Error message
429

430
    """
431
    assert self.success is None
432

    
433
    self.success = success
434
    self.final_message = message
435

    
436
    if success:
437
      logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
438
                   self.node_name)
439
    elif self._daemon_name:
440
      self._lu.LogWarning("%s %r on %s failed: %s",
441
                          self.MODE_TEXT, self._daemon_name, self.node_name,
442
                          message)
443
    else:
444
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
445
                          self.node_name, message)
446

    
447
    self._cbs.ReportFinished(self, self._private)
448

    
449
  def _Finalize(self):
450
    """Makes the RPC call to finalize this import/export.
451

452
    """
453
    return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
454

    
455
  def Finalize(self, error=None):
456
    """Finalizes this import/export.
457

458
    """
459
    if self._daemon_name:
460
      logging.info("Finalizing %s %r on %s",
461
                   self.MODE_TEXT, self._daemon_name, self.node_name)
462

    
463
      result = self._Finalize()
464
      if result.fail_msg:
465
        self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
466
                            self.MODE_TEXT, self._daemon_name,
467
                            self.node_name, result.fail_msg)
468
        return False
469

    
470
      # Daemon is no longer running
471
      self._daemon_name = None
472
      self._ts_cleanup = time.time()
473

    
474
    if error:
475
      self._ReportFinished(False, error)
476

    
477
    return True
478

    
479

    
480
class DiskImport(_DiskImportExportBase):
481
  MODE_TEXT = "import"
482

    
483
  def __init__(self, lu, node_name, opts, instance,
484
               dest, dest_args, timeouts, cbs, private=None):
485
    """Initializes this class.
486

487
    @param lu: Logical unit instance
488
    @type node_name: string
489
    @param node_name: Node name for import
490
    @type opts: L{objects.ImportExportOptions}
491
    @param opts: Import/export daemon options
492
    @type instance: L{objects.Instance}
493
    @param instance: Instance object
494
    @param dest: I/O destination
495
    @param dest_args: I/O arguments
496
    @type timeouts: L{ImportExportTimeouts}
497
    @param timeouts: Timeouts for this import
498
    @type cbs: L{ImportExportCbBase}
499
    @param cbs: Callbacks
500
    @param private: Private data for callback functions
501

502
    """
503
    _DiskImportExportBase.__init__(self, lu, node_name, opts,
504
                                   instance, timeouts, cbs, private)
505
    self._dest = dest
506
    self._dest_args = dest_args
507

    
508
    # Timestamps
509
    self._ts_listening = None
510

    
511
  @property
512
  def listen_port(self):
513
    """Returns the port the daemon is listening on.
514

515
    """
516
    if self._daemon:
517
      return self._daemon.listen_port
518

    
519
    return None
520

    
521
  def _StartDaemon(self):
522
    """Starts the import daemon.
523

524
    """
525
    return self._lu.rpc.call_import_start(self.node_name, self._opts,
526
                                          self._instance,
527
                                          self._dest, self._dest_args)
528

    
529
  def CheckListening(self):
530
    """Checks whether the daemon is listening.
531

532
    @rtype: bool
533
    @return: Whether the daemon is listening
534

535
    """
536
    assert self._daemon, "Daemon status missing"
537

    
538
    if self._ts_listening is not None:
539
      return True
540

    
541
    port = self._daemon.listen_port
542
    if port is not None:
543
      self._ts_listening = time.time()
544

    
545
      logging.debug("Import %r on %s is now listening on port %s",
546
                    self._daemon_name, self.node_name, port)
547

    
548
      self._cbs.ReportListening(self, self._private)
549

    
550
      return True
551

    
552
    if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
553
      raise _ImportExportError("Not listening after %s seconds" %
554
                               self._timeouts.listen)
555

    
556
    return False
557

    
558
  def _GetConnectedCheckEpoch(self):
559
    """Returns the time since we started listening.
560

561
    """
562
    assert self._ts_listening is not None, \
563
           ("Checking whether an import is connected is only useful"
564
            " once it's been listening")
565

    
566
    return self._ts_listening
567

    
568

    
569
class DiskExport(_DiskImportExportBase):
570
  MODE_TEXT = "export"
571

    
572
  def __init__(self, lu, node_name, opts,
573
               dest_host, dest_port, instance, source, source_args,
574
               timeouts, cbs, private=None):
575
    """Initializes this class.
576

577
    @param lu: Logical unit instance
578
    @type node_name: string
579
    @param node_name: Node name for import
580
    @type opts: L{objects.ImportExportOptions}
581
    @param opts: Import/export daemon options
582
    @type dest_host: string
583
    @param dest_host: Destination host name or IP address
584
    @type dest_port: number
585
    @param dest_port: Destination port number
586
    @type instance: L{objects.Instance}
587
    @param instance: Instance object
588
    @param source: I/O source
589
    @param source_args: I/O source
590
    @type timeouts: L{ImportExportTimeouts}
591
    @param timeouts: Timeouts for this import
592
    @type cbs: L{ImportExportCbBase}
593
    @param cbs: Callbacks
594
    @param private: Private data for callback functions
595

596
    """
597
    _DiskImportExportBase.__init__(self, lu, node_name, opts,
598
                                   instance, timeouts, cbs, private)
599
    self._dest_host = dest_host
600
    self._dest_port = dest_port
601
    self._source = source
602
    self._source_args = source_args
603

    
604
  def _StartDaemon(self):
605
    """Starts the export daemon.
606

607
    """
608
    return self._lu.rpc.call_export_start(self.node_name, self._opts,
609
                                          self._dest_host, self._dest_port,
610
                                          self._instance, self._source,
611
                                          self._source_args)
612

    
613
  def CheckListening(self):
614
    """Checks whether the daemon is listening.
615

616
    """
617
    # Only an import can be listening
618
    return True
619

    
620
  def _GetConnectedCheckEpoch(self):
621
    """Returns the time since the daemon started.
622

623
    """
624
    assert self._ts_begin is not None
625

    
626
    return self._ts_begin
627

    
628

    
629
def FormatProgress(progress):
630
  """Formats progress information for user consumption
631

632
  """
633
  (mbytes, throughput, percent, eta) = progress
634

    
635
  parts = [
636
    utils.FormatUnit(mbytes, "h"),
637

    
638
    # Not using FormatUnit as it doesn't support kilobytes
639
    "%0.1f MiB/s" % throughput,
640
    ]
641

    
642
  if percent is not None:
643
    parts.append("%d%%" % percent)
644

    
645
  if eta is not None:
646
    parts.append("ETA %s" % utils.FormatSeconds(eta))
647

    
648
  return utils.CommaJoin(parts)
649

    
650

    
651
class ImportExportLoop:
652
  MIN_DELAY = 1.0
653
  MAX_DELAY = 20.0
654

    
655
  def __init__(self, lu):
656
    """Initializes this class.
657

658
    """
659
    self._lu = lu
660
    self._queue = []
661
    self._pending_add = []
662

    
663
  def Add(self, diskie):
664
    """Adds an import/export object to the loop.
665

666
    @type diskie: Subclass of L{_DiskImportExportBase}
667
    @param diskie: Import/export object
668

669
    """
670
    assert diskie not in self._pending_add
671
    assert diskie.loop is None
672

    
673
    diskie.SetLoop(self)
674

    
675
    # Adding new objects to a staging list is necessary, otherwise the main
676
    # loop gets confused if callbacks modify the queue while the main loop is
677
    # iterating over it.
678
    self._pending_add.append(diskie)
679

    
680
  @staticmethod
681
  def _CollectDaemonStatus(lu, daemons):
682
    """Collects the status for all import/export daemons.
683

684
    """
685
    daemon_status = {}
686

    
687
    for node_name, names in daemons.iteritems():
688
      result = lu.rpc.call_impexp_status(node_name, names)
689
      if result.fail_msg:
690
        lu.LogWarning("Failed to get daemon status on %s: %s",
691
                      node_name, result.fail_msg)
692
        continue
693

    
694
      assert len(names) == len(result.payload)
695

    
696
      daemon_status[node_name] = dict(zip(names, result.payload))
697

    
698
    return daemon_status
699

    
700
  @staticmethod
701
  def _GetActiveDaemonNames(queue):
702
    """Gets the names of all active daemons.
703

704
    """
705
    result = {}
706
    for diskie in queue:
707
      if not diskie.active:
708
        continue
709

    
710
      try:
711
        # Start daemon if necessary
712
        daemon_name = diskie.CheckDaemon()
713
      except _ImportExportError, err:
714
        logging.exception("%s failed", diskie.MODE_TEXT)
715
        diskie.Finalize(error=str(err))
716
        continue
717

    
718
      result.setdefault(diskie.node_name, []).append(daemon_name)
719

    
720
    assert len(queue) >= len(result)
721
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
722

    
723
    logging.debug("daemons=%r", result)
724

    
725
    return result
726

    
727
  def _AddPendingToQueue(self):
728
    """Adds all pending import/export objects to the internal queue.
729

730
    """
731
    assert compat.all(diskie not in self._queue and diskie.loop == self
732
                      for diskie in self._pending_add)
733

    
734
    self._queue.extend(self._pending_add)
735

    
736
    del self._pending_add[:]
737

    
738
  def Run(self):
739
    """Utility main loop.
740

741
    """
742
    while True:
743
      self._AddPendingToQueue()
744

    
745
      # Collect all active daemon names
746
      daemons = self._GetActiveDaemonNames(self._queue)
747
      if not daemons:
748
        break
749

    
750
      # Collection daemon status data
751
      data = self._CollectDaemonStatus(self._lu, daemons)
752

    
753
      # Use data
754
      delay = self.MAX_DELAY
755
      for diskie in self._queue:
756
        if not diskie.active:
757
          continue
758

    
759
        try:
760
          try:
761
            all_daemon_data = data[diskie.node_name]
762
          except KeyError:
763
            result = diskie.SetDaemonData(False, None)
764
          else:
765
            result = \
766
              diskie.SetDaemonData(True,
767
                                   all_daemon_data[diskie.GetDaemonName()])
768

    
769
          if not result:
770
            # Daemon not yet ready, retry soon
771
            delay = min(3.0, delay)
772
            continue
773

    
774
          if diskie.CheckFinished():
775
            # Transfer finished
776
            diskie.Finalize()
777
            continue
778

    
779
          # Normal case: check again in 5 seconds
780
          delay = min(5.0, delay)
781

    
782
          if not diskie.CheckListening():
783
            # Not yet listening, retry soon
784
            delay = min(1.0, delay)
785
            continue
786

    
787
          if not diskie.CheckConnected():
788
            # Not yet connected, retry soon
789
            delay = min(1.0, delay)
790
            continue
791

    
792
        except _ImportExportError, err:
793
          logging.exception("%s failed", diskie.MODE_TEXT)
794
          diskie.Finalize(error=str(err))
795

    
796
      if not compat.any([diskie.active for diskie in self._queue]):
797
        break
798

    
799
      # Wait a bit
800
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
801
      logging.debug("Waiting for %ss", delay)
802
      time.sleep(delay)
803

    
804
  def FinalizeAll(self):
805
    """Finalizes all pending transfers.
806

807
    """
808
    success = True
809

    
810
    for diskie in self._queue:
811
      success = diskie.Finalize() and success
812

    
813
    return success
814

    
815

    
816
class _TransferInstCbBase(ImportExportCbBase):
817
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
818
               dest_node, dest_ip, export_opts):
819
    """Initializes this class.
820

821
    """
822
    ImportExportCbBase.__init__(self)
823

    
824
    self.lu = lu
825
    self.feedback_fn = feedback_fn
826
    self.instance = instance
827
    self.timeouts = timeouts
828
    self.src_node = src_node
829
    self.src_cbs = src_cbs
830
    self.dest_node = dest_node
831
    self.dest_ip = dest_ip
832
    self.export_opts = export_opts
833

    
834

    
835
class _TransferInstSourceCb(_TransferInstCbBase):
836
  def ReportConnected(self, ie, dtp):
837
    """Called when a connection has been established.
838

839
    """
840
    assert self.src_cbs is None
841
    assert dtp.src_export == ie
842
    assert dtp.dest_import
843

    
844
    self.feedback_fn("%s is sending data on %s" %
845
                     (dtp.data.name, ie.node_name))
846

    
847
  def ReportProgress(self, ie, dtp):
848
    """Called when new progress information should be reported.
849

850
    """
851
    progress = ie.progress
852
    if not progress:
853
      return
854

    
855
    self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
856

    
857
  def ReportFinished(self, ie, dtp):
858
    """Called when a transfer has finished.
859

860
    """
861
    assert self.src_cbs is None
862
    assert dtp.src_export == ie
863
    assert dtp.dest_import
864

    
865
    if ie.success:
866
      self.feedback_fn("%s finished sending data" % dtp.data.name)
867
    else:
868
      self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
869
                       (dtp.data.name, ie.final_message, ie.recent_output))
870

    
871
    dtp.RecordResult(ie.success)
872

    
873
    cb = dtp.data.finished_fn
874
    if cb:
875
      cb()
876

    
877
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
878
    # give the daemon a moment to sort things out
879
    if dtp.dest_import and not ie.success:
880
      dtp.dest_import.Abort()
881

    
882

    
883
class _TransferInstDestCb(_TransferInstCbBase):
884
  def ReportListening(self, ie, dtp):
885
    """Called when daemon started listening.
886

887
    """
888
    assert self.src_cbs
889
    assert dtp.src_export is None
890
    assert dtp.dest_import
891

    
892
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
893

    
894
    # Start export on source node
895
    de = DiskExport(self.lu, self.src_node, self.export_opts,
896
                    self.dest_ip, ie.listen_port,
897
                    self.instance, dtp.data.src_io, dtp.data.src_ioargs,
898
                    self.timeouts, self.src_cbs, private=dtp)
899
    ie.loop.Add(de)
900

    
901
    dtp.src_export = de
902

    
903
  def ReportConnected(self, ie, dtp):
904
    """Called when a connection has been established.
905

906
    """
907
    self.feedback_fn("%s is receiving data on %s" %
908
                     (dtp.data.name, self.dest_node))
909

    
910
  def ReportFinished(self, ie, dtp):
911
    """Called when a transfer has finished.
912

913
    """
914
    if ie.success:
915
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
916
    else:
917
      self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
918
                       (dtp.data.name, ie.final_message, ie.recent_output))
919

    
920
    dtp.RecordResult(ie.success)
921

    
922
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
923
    # give the daemon a moment to sort things out
924
    if dtp.src_export and not ie.success:
925
      dtp.src_export.Abort()
926

    
927

    
928
class DiskTransfer(object):
929
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
930
               finished_fn):
931
    """Initializes this class.
932

933
    @type name: string
934
    @param name: User-visible name for this transfer (e.g. "disk/0")
935
    @param src_io: Source I/O type
936
    @param src_ioargs: Source I/O arguments
937
    @param dest_io: Destination I/O type
938
    @param dest_ioargs: Destination I/O arguments
939
    @type finished_fn: callable
940
    @param finished_fn: Function called once transfer has finished
941

942
    """
943
    self.name = name
944

    
945
    self.src_io = src_io
946
    self.src_ioargs = src_ioargs
947

    
948
    self.dest_io = dest_io
949
    self.dest_ioargs = dest_ioargs
950

    
951
    self.finished_fn = finished_fn
952

    
953

    
954
class _DiskTransferPrivate(object):
955
  def __init__(self, data, success):
956
    """Initializes this class.
957

958
    @type data: L{DiskTransfer}
959
    @type success: bool
960

961
    """
962
    self.data = data
963

    
964
    self.src_export = None
965
    self.dest_import = None
966

    
967
    self.success = success
968

    
969
  def RecordResult(self, success):
970
    """Updates the status.
971

972
    One failed part will cause the whole transfer to fail.
973

974
    """
975
    self.success = self.success and success
976

    
977

    
978
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
979
                         instance, all_transfers):
980
  """Transfers an instance's data from one node to another.
981

982
  @param lu: Logical unit instance
983
  @param feedback_fn: Feedback function
984
  @type src_node: string
985
  @param src_node: Source node name
986
  @type dest_node: string
987
  @param dest_node: Destination node name
988
  @type dest_ip: string
989
  @param dest_ip: IP address of destination node
990
  @type instance: L{objects.Instance}
991
  @param instance: Instance object
992
  @type all_transfers: list of L{DiskTransfer} instances
993
  @param all_transfers: List of all disk transfers to be made
994
  @rtype: list
995
  @return: List with a boolean (True=successful, False=failed) for success for
996
           each transfer
997

998
  """
999
  # Compress only if transfer is to another node
1000
  if src_node == dest_node:
1001
    compress = constants.IEC_NONE
1002
  else:
1003
    compress = constants.IEC_GZIP
1004

    
1005
  logging.debug("Source node %s, destination node %s, compression '%s'",
1006
                src_node, dest_node, compress)
1007

    
1008
  opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1009
                                     compress=compress)
1010

    
1011
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1012
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1013
                                  src_node, None, dest_node, dest_ip, opts)
1014
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1015
                                 src_node, src_cbs, dest_node, dest_ip, opts)
1016

    
1017
  all_dtp = []
1018

    
1019
  ieloop = ImportExportLoop(lu)
1020
  try:
1021
    for transfer in all_transfers:
1022
      if transfer:
1023
        feedback_fn("Exporting %s from %s to %s" %
1024
                    (transfer.name, src_node, dest_node))
1025

    
1026
        dtp = _DiskTransferPrivate(transfer, True)
1027

    
1028
        di = DiskImport(lu, dest_node, opts, instance,
1029
                        transfer.dest_io, transfer.dest_ioargs,
1030
                        timeouts, dest_cbs, private=dtp)
1031
        ieloop.Add(di)
1032

    
1033
        dtp.dest_import = di
1034
      else:
1035
        dtp = _DiskTransferPrivate(None, False)
1036

    
1037
      all_dtp.append(dtp)
1038

    
1039
    ieloop.Run()
1040
  finally:
1041
    ieloop.FinalizeAll()
1042

    
1043
  assert len(all_dtp) == len(all_transfers)
1044
  assert compat.all([(dtp.src_export is None or
1045
                      dtp.src_export.success is not None) and
1046
                     (dtp.dest_import is None or
1047
                      dtp.dest_import.success is not None)
1048
                     for dtp in all_dtp]), \
1049
         "Not all imports/exports are finalized"
1050

    
1051
  return [bool(dtp.success) for dtp in all_dtp]
1052

    
1053

    
1054
class _RemoteExportCb(ImportExportCbBase):
1055
  def __init__(self, feedback_fn, disk_count):
1056
    """Initializes this class.
1057

1058
    """
1059
    ImportExportCbBase.__init__(self)
1060
    self._feedback_fn = feedback_fn
1061
    self._dresults = [None] * disk_count
1062

    
1063
  @property
1064
  def disk_results(self):
1065
    """Returns per-disk results.
1066

1067
    """
1068
    return self._dresults
1069

    
1070
  def ReportConnected(self, ie, private):
1071
    """Called when a connection has been established.
1072

1073
    """
1074
    (idx, _) = private
1075

    
1076
    self._feedback_fn("Disk %s is now sending data" % idx)
1077

    
1078
  def ReportProgress(self, ie, private):
1079
    """Called when new progress information should be reported.
1080

1081
    """
1082
    (idx, _) = private
1083

    
1084
    progress = ie.progress
1085
    if not progress:
1086
      return
1087

    
1088
    self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1089

    
1090
  def ReportFinished(self, ie, private):
1091
    """Called when a transfer has finished.
1092

1093
    """
1094
    (idx, finished_fn) = private
1095

    
1096
    if ie.success:
1097
      self._feedback_fn("Disk %s finished sending data" % idx)
1098
    else:
1099
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" %
1100
                        (idx, ie.final_message, ie.recent_output))
1101

    
1102
    self._dresults[idx] = bool(ie.success)
1103

    
1104
    if finished_fn:
1105
      finished_fn()
1106

    
1107

    
1108
class ExportInstanceHelper:
1109
  def __init__(self, lu, feedback_fn, instance):
1110
    """Initializes this class.
1111

1112
    @param lu: Logical unit instance
1113
    @param feedback_fn: Feedback function
1114
    @type instance: L{objects.Instance}
1115
    @param instance: Instance object
1116

1117
    """
1118
    self._lu = lu
1119
    self._feedback_fn = feedback_fn
1120
    self._instance = instance
1121

    
1122
    self._snap_disks = []
1123
    self._removed_snaps = [False] * len(instance.disks)
1124

    
1125
  def CreateSnapshots(self):
1126
    """Creates an LVM snapshot for every disk of the instance.
1127

1128
    """
1129
    assert not self._snap_disks
1130

    
1131
    instance = self._instance
1132
    src_node = instance.primary_node
1133

    
1134
    vgname = self._lu.cfg.GetVGName()
1135

    
1136
    for idx, disk in enumerate(instance.disks):
1137
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1138
                        (idx, src_node))
1139

    
1140
      # result.payload will be a snapshot of an lvm leaf of the one we
1141
      # passed
1142
      result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1143
      msg = result.fail_msg
1144
      if msg:
1145
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1146
                            idx, src_node, msg)
1147
        new_dev = False
1148
      else:
1149
        disk_id = (vgname, result.payload)
1150
        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1151
                               logical_id=disk_id, physical_id=disk_id,
1152
                               iv_name=disk.iv_name)
1153

    
1154
      self._snap_disks.append(new_dev)
1155

    
1156
    assert len(self._snap_disks) == len(instance.disks)
1157
    assert len(self._removed_snaps) == len(instance.disks)
1158

    
1159
  def _RemoveSnapshot(self, disk_index):
1160
    """Removes an LVM snapshot.
1161

1162
    @type disk_index: number
1163
    @param disk_index: Index of the snapshot to be removed
1164

1165
    """
1166
    disk = self._snap_disks[disk_index]
1167
    if disk and not self._removed_snaps[disk_index]:
1168
      src_node = self._instance.primary_node
1169

    
1170
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1171
                        (disk_index, src_node))
1172

    
1173
      result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1174
      if result.fail_msg:
1175
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1176
                            " %s: %s", disk_index, src_node, result.fail_msg)
1177
      else:
1178
        self._removed_snaps[disk_index] = True
1179

    
1180
  def LocalExport(self, dest_node):
1181
    """Intra-cluster instance export.
1182

1183
    @type dest_node: L{objects.Node}
1184
    @param dest_node: Destination node
1185

1186
    """
1187
    instance = self._instance
1188
    src_node = instance.primary_node
1189

    
1190
    assert len(self._snap_disks) == len(instance.disks)
1191

    
1192
    transfers = []
1193

    
1194
    for idx, dev in enumerate(self._snap_disks):
1195
      if not dev:
1196
        transfers.append(None)
1197
        continue
1198

    
1199
      path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1200
                            dev.physical_id[1])
1201

    
1202
      finished_fn = compat.partial(self._TransferFinished, idx)
1203

    
1204
      # FIXME: pass debug option from opcode to backend
1205
      dt = DiskTransfer("snapshot/%s" % idx,
1206
                        constants.IEIO_SCRIPT, (dev, idx),
1207
                        constants.IEIO_FILE, (path, ),
1208
                        finished_fn)
1209
      transfers.append(dt)
1210

    
1211
    # Actually export data
1212
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1213
                                    src_node, dest_node.name,
1214
                                    dest_node.secondary_ip,
1215
                                    instance, transfers)
1216

    
1217
    assert len(dresults) == len(instance.disks)
1218

    
1219
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1220
    result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1221
                                               self._snap_disks)
1222
    msg = result.fail_msg
1223
    fin_resu = not msg
1224
    if msg:
1225
      self._lu.LogWarning("Could not finalize export for instance %s"
1226
                          " on node %s: %s", instance.name, dest_node.name, msg)
1227

    
1228
    return (fin_resu, dresults)
1229

    
1230
  def RemoteExport(self, opts, disk_info, timeouts):
1231
    """Inter-cluster instance export.
1232

1233
    @type opts: L{objects.ImportExportOptions}
1234
    @param opts: Import/export daemon options
1235
    @type disk_info: list
1236
    @param disk_info: Per-disk destination information
1237
    @type timeouts: L{ImportExportTimeouts}
1238
    @param timeouts: Timeouts for this import
1239

1240
    """
1241
    instance = self._instance
1242

    
1243
    assert len(disk_info) == len(instance.disks)
1244

    
1245
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1246

    
1247
    ieloop = ImportExportLoop(self._lu)
1248
    try:
1249
      for idx, (dev, (host, port)) in enumerate(zip(instance.disks,
1250
                                                    disk_info)):
1251
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1252
        finished_fn = compat.partial(self._TransferFinished, idx)
1253
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1254
                              opts, host, port, instance,
1255
                              constants.IEIO_SCRIPT, (dev, idx),
1256
                              timeouts, cbs, private=(idx, finished_fn)))
1257

    
1258
      ieloop.Run()
1259
    finally:
1260
      ieloop.FinalizeAll()
1261

    
1262
    return (True, cbs.disk_results)
1263

    
1264
  def _TransferFinished(self, idx):
1265
    """Called once a transfer has finished.
1266

1267
    @type idx: number
1268
    @param idx: Disk index
1269

1270
    """
1271
    logging.debug("Transfer %s finished", idx)
1272
    self._RemoveSnapshot(idx)
1273

    
1274
  def Cleanup(self):
1275
    """Remove all snapshots.
1276

1277
    """
1278
    assert len(self._removed_snaps) == len(self._instance.disks)
1279
    for idx in range(len(self._instance.disks)):
1280
      self._RemoveSnapshot(idx)
1281

    
1282

    
1283
class _RemoteImportCb(ImportExportCbBase):
1284
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1285
               external_address):
1286
    """Initializes this class.
1287

1288
    @type cds: string
1289
    @param cds: Cluster domain secret
1290
    @type x509_cert_pem: string
1291
    @param x509_cert_pem: CA used for signing import key
1292
    @type disk_count: number
1293
    @param disk_count: Number of disks
1294
    @type external_address: string
1295
    @param external_address: External address of destination node
1296

1297
    """
1298
    ImportExportCbBase.__init__(self)
1299
    self._feedback_fn = feedback_fn
1300
    self._cds = cds
1301
    self._x509_cert_pem = x509_cert_pem
1302
    self._disk_count = disk_count
1303
    self._external_address = external_address
1304

    
1305
    self._dresults = [None] * disk_count
1306
    self._daemon_port = [None] * disk_count
1307

    
1308
    self._salt = utils.GenerateSecret(8)
1309

    
1310
  @property
1311
  def disk_results(self):
1312
    """Returns per-disk results.
1313

1314
    """
1315
    return self._dresults
1316

    
1317
  def _CheckAllListening(self):
1318
    """Checks whether all daemons are listening.
1319

1320
    If all daemons are listening, the information is sent to the client.
1321

1322
    """
1323
    if not compat.all(dp is not None for dp in self._daemon_port):
1324
      return
1325

    
1326
    host = self._external_address
1327

    
1328
    disks = []
1329
    for idx, port in enumerate(self._daemon_port):
1330
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1331
                                               idx, host, port))
1332

    
1333
    assert len(disks) == self._disk_count
1334

    
1335
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1336
      "disks": disks,
1337
      "x509_ca": self._x509_cert_pem,
1338
      })
1339

    
1340
  def ReportListening(self, ie, private):
1341
    """Called when daemon started listening.
1342

1343
    """
1344
    (idx, ) = private
1345

    
1346
    self._feedback_fn("Disk %s is now listening" % idx)
1347

    
1348
    assert self._daemon_port[idx] is None
1349

    
1350
    self._daemon_port[idx] = ie.listen_port
1351

    
1352
    self._CheckAllListening()
1353

    
1354
  def ReportConnected(self, ie, private):
1355
    """Called when a connection has been established.
1356

1357
    """
1358
    (idx, ) = private
1359

    
1360
    self._feedback_fn("Disk %s is now receiving data" % idx)
1361

    
1362
  def ReportFinished(self, ie, private):
1363
    """Called when a transfer has finished.
1364

1365
    """
1366
    (idx, ) = private
1367

    
1368
    # Daemon is certainly no longer listening
1369
    self._daemon_port[idx] = None
1370

    
1371
    if ie.success:
1372
      self._feedback_fn("Disk %s finished receiving data" % idx)
1373
    else:
1374
      self._feedback_fn(("Disk %s failed to receive data: %s"
1375
                         " (recent output: %r)") %
1376
                        (idx, ie.final_message, ie.recent_output))
1377

    
1378
    self._dresults[idx] = bool(ie.success)
1379

    
1380

    
1381
def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts):
1382
  """Imports an instance from another cluster.
1383

1384
  @param lu: Logical unit instance
1385
  @param feedback_fn: Feedback function
1386
  @type instance: L{objects.Instance}
1387
  @param instance: Instance object
1388
  @type source_x509_ca: OpenSSL.crypto.X509
1389
  @param source_x509_ca: Import source's X509 CA
1390
  @type cds: string
1391
  @param cds: Cluster domain secret
1392
  @type timeouts: L{ImportExportTimeouts}
1393
  @param timeouts: Timeouts for this import
1394

1395
  """
1396
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1397
                                                  source_x509_ca)
1398

    
1399
  # Create crypto key
1400
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1401
                                        constants.RIE_CERT_VALIDITY)
1402
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1403

    
1404
  (x509_key_name, x509_cert_pem) = result.payload
1405
  try:
1406
    # Load certificate
1407
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1408
                                                x509_cert_pem)
1409

    
1410
    # Import daemon options
1411
    opts = objects.ImportExportOptions(key_name=x509_key_name,
1412
                                       ca_pem=source_ca_pem)
1413

    
1414
    # Sign certificate
1415
    signed_x509_cert_pem = \
1416
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1417

    
1418
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1419
                          len(instance.disks), instance.primary_node)
1420

    
1421
    ieloop = ImportExportLoop(lu)
1422
    try:
1423
      for idx, dev in enumerate(instance.disks):
1424
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1425
                              constants.IEIO_SCRIPT, (dev, idx),
1426
                              timeouts, cbs, private=(idx, )))
1427

    
1428
      ieloop.Run()
1429
    finally:
1430
      ieloop.FinalizeAll()
1431
  finally:
1432
    # Remove crypto key and certificate
1433
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1434
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1435

    
1436
  return cbs.disk_results
1437

    
1438

    
1439
def _GetImportExportHandshakeMessage(version):
1440
  """Returns the handshake message for a RIE protocol version.
1441

1442
  @type version: number
1443

1444
  """
1445
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1446

    
1447

    
1448
def ComputeRemoteExportHandshake(cds):
1449
  """Computes the remote import/export handshake.
1450

1451
  @type cds: string
1452
  @param cds: Cluster domain secret
1453

1454
  """
1455
  salt = utils.GenerateSecret(8)
1456
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1457
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1458

    
1459

    
1460
def CheckRemoteExportHandshake(cds, handshake):
1461
  """Checks the handshake of a remote import/export.
1462

1463
  @type cds: string
1464
  @param cds: Cluster domain secret
1465
  @type handshake: sequence
1466
  @param handshake: Handshake sent by remote peer
1467

1468
  """
1469
  try:
1470
    (version, hmac_digest, hmac_salt) = handshake
1471
  except (TypeError, ValueError), err:
1472
    return "Invalid data: %s" % err
1473

    
1474
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1475
                              hmac_digest, salt=hmac_salt):
1476
    return "Hash didn't match, clusters don't share the same domain secret"
1477

    
1478
  if version != constants.RIE_VERSION:
1479
    return ("Clusters don't have the same remote import/export protocol"
1480
            " (local=%s, remote=%s)" %
1481
            (constants.RIE_VERSION, version))
1482

    
1483
  return None
1484

    
1485

    
1486
def _GetRieDiskInfoMessage(disk_index, host, port):
1487
  """Returns the hashed text for import/export disk information.
1488

1489
  @type disk_index: number
1490
  @param disk_index: Index of disk (included in hash)
1491
  @type host: string
1492
  @param host: Hostname
1493
  @type port: number
1494
  @param port: Daemon port
1495

1496
  """
1497
  return "%s:%s:%s" % (disk_index, host, port)
1498

    
1499

    
1500
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1501
  """Verifies received disk information for an export.
1502

1503
  @type cds: string
1504
  @param cds: Cluster domain secret
1505
  @type disk_index: number
1506
  @param disk_index: Index of disk (included in hash)
1507
  @type disk_info: sequence
1508
  @param disk_info: Disk information sent by remote peer
1509

1510
  """
1511
  try:
1512
    (host, port, hmac_digest, hmac_salt) = disk_info
1513
  except (TypeError, ValueError), err:
1514
    raise errors.GenericError("Invalid data: %s" % err)
1515

    
1516
  if not (host and port):
1517
    raise errors.GenericError("Missing destination host or port")
1518

    
1519
  msg = _GetRieDiskInfoMessage(disk_index, host, port)
1520

    
1521
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1522
    raise errors.GenericError("HMAC is wrong")
1523

    
1524
  return (utils.HostInfo.NormalizeName(host),
1525
          utils.ValidateServiceName(port))
1526

    
1527

    
1528
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port):
1529
  """Computes the signed disk information for a remote import.
1530

1531
  @type cds: string
1532
  @param cds: Cluster domain secret
1533
  @type salt: string
1534
  @param salt: HMAC salt
1535
  @type disk_index: number
1536
  @param disk_index: Index of disk (included in hash)
1537
  @type host: string
1538
  @param host: Hostname
1539
  @type port: number
1540
  @param port: Daemon port
1541

1542
  """
1543
  msg = _GetRieDiskInfoMessage(disk_index, host, port)
1544
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1545
  return (host, port, hmac_digest, salt)