Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ d51ae04c

History | View | Annotate | Download (43.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Instance-related functions and classes for masterd.
23

24
"""
25

    
26
import logging
27
import time
28
import OpenSSL
29

    
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import compat
33
from ganeti import utils
34
from ganeti import objects
35

    
36

    
37
class _ImportExportError(Exception):
38
  """Local exception to report import/export errors.
39

40
  """
41

    
42

    
43
class ImportExportTimeouts(object):
44
  #: Time until daemon starts writing status file
45
  DEFAULT_READY_TIMEOUT = 10
46

    
47
  #: Length of time until errors cause hard failure
48
  DEFAULT_ERROR_TIMEOUT = 10
49

    
50
  #: Time after which daemon must be listening
51
  DEFAULT_LISTEN_TIMEOUT = 10
52

    
53
  #: Progress update interval
54
  DEFAULT_PROGRESS_INTERVAL = 60
55

    
56
  __slots__ = [
57
    "error",
58
    "ready",
59
    "listen",
60
    "connect",
61
    "progress",
62
    ]
63

    
64
  def __init__(self, connect,
65
               listen=DEFAULT_LISTEN_TIMEOUT,
66
               error=DEFAULT_ERROR_TIMEOUT,
67
               ready=DEFAULT_READY_TIMEOUT,
68
               progress=DEFAULT_PROGRESS_INTERVAL):
69
    """Initializes this class.
70

71
    @type connect: number
72
    @param connect: Timeout for establishing connection
73
    @type listen: number
74
    @param listen: Timeout for starting to listen for connections
75
    @type error: number
76
    @param error: Length of time until errors cause hard failure
77
    @type ready: number
78
    @param ready: Timeout for daemon to become ready
79
    @type progress: number
80
    @param progress: Progress update interval
81

82
    """
83
    self.error = error
84
    self.ready = ready
85
    self.listen = listen
86
    self.connect = connect
87
    self.progress = progress
88

    
89

    
90
class ImportExportCbBase(object):
91
  """Callbacks for disk import/export.
92

93
  """
94
  def ReportListening(self, ie, private):
95
    """Called when daemon started listening.
96

97
    @type ie: Subclass of L{_DiskImportExportBase}
98
    @param ie: Import/export object
99
    @param private: Private data passed to import/export object
100

101
    """
102

    
103
  def ReportConnected(self, ie, private):
104
    """Called when a connection has been established.
105

106
    @type ie: Subclass of L{_DiskImportExportBase}
107
    @param ie: Import/export object
108
    @param private: Private data passed to import/export object
109

110
    """
111

    
112
  def ReportProgress(self, ie, private):
113
    """Called when new progress information should be reported.
114

115
    @type ie: Subclass of L{_DiskImportExportBase}
116
    @param ie: Import/export object
117
    @param private: Private data passed to import/export object
118

119
    """
120

    
121
  def ReportFinished(self, ie, private):
122
    """Called when a transfer has finished.
123

124
    @type ie: Subclass of L{_DiskImportExportBase}
125
    @param ie: Import/export object
126
    @param private: Private data passed to import/export object
127

128
    """
129

    
130

    
131
def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
132
  """Checks whether a timeout has expired.
133

134
  """
135
  return _time_fn() > (epoch + timeout)
136

    
137

    
138
class _DiskImportExportBase(object):
139
  MODE_TEXT = None
140

    
141
  def __init__(self, lu, node_name, opts,
142
               instance, timeouts, cbs, private=None):
143
    """Initializes this class.
144

145
    @param lu: Logical unit instance
146
    @type node_name: string
147
    @param node_name: Node name for import
148
    @type opts: L{objects.ImportExportOptions}
149
    @param opts: Import/export daemon options
150
    @type instance: L{objects.Instance}
151
    @param instance: Instance object
152
    @type timeouts: L{ImportExportTimeouts}
153
    @param timeouts: Timeouts for this import
154
    @type cbs: L{ImportExportCbBase}
155
    @param cbs: Callbacks
156
    @param private: Private data for callback functions
157

158
    """
159
    assert self.MODE_TEXT
160

    
161
    self._lu = lu
162
    self.node_name = node_name
163
    self._opts = opts
164
    self._instance = instance
165
    self._timeouts = timeouts
166
    self._cbs = cbs
167
    self._private = private
168

    
169
    # Parent loop
170
    self._loop = None
171

    
172
    # Timestamps
173
    self._ts_begin = None
174
    self._ts_connected = None
175
    self._ts_finished = None
176
    self._ts_cleanup = None
177
    self._ts_last_progress = None
178
    self._ts_last_error = None
179

    
180
    # Transfer status
181
    self.success = None
182
    self.final_message = None
183

    
184
    # Daemon status
185
    self._daemon_name = None
186
    self._daemon = None
187

    
188
  @property
189
  def recent_output(self):
190
    """Returns the most recent output from the daemon.
191

192
    """
193
    if self._daemon:
194
      return self._daemon.recent_output
195

    
196
    return None
197

    
198
  @property
199
  def progress(self):
200
    """Returns transfer progress information.
201

202
    """
203
    if not self._daemon:
204
      return None
205

    
206
    return (self._daemon.progress_mbytes,
207
            self._daemon.progress_throughput,
208
            self._daemon.progress_percent,
209
            self._daemon.progress_eta)
210

    
211
  @property
212
  def magic(self):
213
    """Returns the magic value for this import/export.
214

215
    """
216
    return self._opts.magic
217

    
218
  @property
219
  def active(self):
220
    """Determines whether this transport is still active.
221

222
    """
223
    return self.success is None
224

    
225
  @property
226
  def loop(self):
227
    """Returns parent loop.
228

229
    @rtype: L{ImportExportLoop}
230

231
    """
232
    return self._loop
233

    
234
  def SetLoop(self, loop):
235
    """Sets the parent loop.
236

237
    @type loop: L{ImportExportLoop}
238

239
    """
240
    if self._loop:
241
      raise errors.ProgrammerError("Loop can only be set once")
242

    
243
    self._loop = loop
244

    
245
  def _StartDaemon(self):
246
    """Starts the import/export daemon.
247

248
    """
249
    raise NotImplementedError()
250

    
251
  def CheckDaemon(self):
252
    """Checks whether daemon has been started and if not, starts it.
253

254
    @rtype: string
255
    @return: Daemon name
256

257
    """
258
    assert self._ts_cleanup is None
259

    
260
    if self._daemon_name is None:
261
      assert self._ts_begin is None
262

    
263
      result = self._StartDaemon()
264
      if result.fail_msg:
265
        raise _ImportExportError("Failed to start %s on %s: %s" %
266
                                 (self.MODE_TEXT, self.node_name,
267
                                  result.fail_msg))
268

    
269
      daemon_name = result.payload
270

    
271
      logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
272
                   self.node_name)
273

    
274
      self._ts_begin = time.time()
275
      self._daemon_name = daemon_name
276

    
277
    return self._daemon_name
278

    
279
  def GetDaemonName(self):
280
    """Returns the daemon name.
281

282
    """
283
    assert self._daemon_name, "Daemon has not been started"
284
    assert self._ts_cleanup is None
285
    return self._daemon_name
286

    
287
  def Abort(self):
288
    """Sends SIGTERM to import/export daemon (if still active).
289

290
    """
291
    if self._daemon_name:
292
      self._lu.LogWarning("Aborting %s %r on %s",
293
                          self.MODE_TEXT, self._daemon_name, self.node_name)
294
      result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
295
      if result.fail_msg:
296
        self._lu.LogWarning("Failed to abort %s %r on %s: %s",
297
                            self.MODE_TEXT, self._daemon_name,
298
                            self.node_name, result.fail_msg)
299
        return False
300

    
301
    return True
302

    
303
  def _SetDaemonData(self, data):
304
    """Internal function for updating status daemon data.
305

306
    @type data: L{objects.ImportExportStatus}
307
    @param data: Daemon status data
308

309
    """
310
    assert self._ts_begin is not None
311

    
312
    if not data:
313
      if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
314
        raise _ImportExportError("Didn't become ready after %s seconds" %
315
                                 self._timeouts.ready)
316

    
317
      return False
318

    
319
    self._daemon = data
320

    
321
    return True
322

    
323
  def SetDaemonData(self, success, data):
324
    """Updates daemon status data.
325

326
    @type success: bool
327
    @param success: Whether fetching data was successful or not
328
    @type data: L{objects.ImportExportStatus}
329
    @param data: Daemon status data
330

331
    """
332
    if not success:
333
      if self._ts_last_error is None:
334
        self._ts_last_error = time.time()
335

    
336
      elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
337
        raise _ImportExportError("Too many errors while updating data")
338

    
339
      return False
340

    
341
    self._ts_last_error = None
342

    
343
    return self._SetDaemonData(data)
344

    
345
  def CheckListening(self):
346
    """Checks whether the daemon is listening.
347

348
    """
349
    raise NotImplementedError()
350

    
351
  def _GetConnectedCheckEpoch(self):
352
    """Returns timeout to calculate connect timeout.
353

354
    """
355
    raise NotImplementedError()
356

    
357
  def CheckConnected(self):
358
    """Checks whether the daemon is connected.
359

360
    @rtype: bool
361
    @return: Whether the daemon is connected
362

363
    """
364
    assert self._daemon, "Daemon status missing"
365

    
366
    if self._ts_connected is not None:
367
      return True
368

    
369
    if self._daemon.connected:
370
      self._ts_connected = time.time()
371

    
372
      # TODO: Log remote peer
373
      logging.debug("%s %r on %s is now connected",
374
                    self.MODE_TEXT, self._daemon_name, self.node_name)
375

    
376
      self._cbs.ReportConnected(self, self._private)
377

    
378
      return True
379

    
380
    if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
381
      raise _ImportExportError("Not connected after %s seconds" %
382
                               self._timeouts.connect)
383

    
384
    return False
385

    
386
  def _CheckProgress(self):
387
    """Checks whether a progress update should be reported.
388

389
    """
390
    if ((self._ts_last_progress is None or
391
         _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
392
        self._daemon and
393
        self._daemon.progress_mbytes is not None and
394
        self._daemon.progress_throughput is not None):
395
      self._cbs.ReportProgress(self, self._private)
396
      self._ts_last_progress = time.time()
397

    
398
  def CheckFinished(self):
399
    """Checks whether the daemon exited.
400

401
    @rtype: bool
402
    @return: Whether the transfer is finished
403

404
    """
405
    assert self._daemon, "Daemon status missing"
406

    
407
    if self._ts_finished:
408
      return True
409

    
410
    if self._daemon.exit_status is None:
411
      # TODO: Adjust delay for ETA expiring soon
412
      self._CheckProgress()
413
      return False
414

    
415
    self._ts_finished = time.time()
416

    
417
    self._ReportFinished(self._daemon.exit_status == 0,
418
                         self._daemon.error_message)
419

    
420
    return True
421

    
422
  def _ReportFinished(self, success, message):
423
    """Transfer is finished or daemon exited.
424

425
    @type success: bool
426
    @param success: Whether the transfer was successful
427
    @type message: string
428
    @param message: Error message
429

430
    """
431
    assert self.success is None
432

    
433
    self.success = success
434
    self.final_message = message
435

    
436
    if success:
437
      logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
438
                   self.node_name)
439
    elif self._daemon_name:
440
      self._lu.LogWarning("%s %r on %s failed: %s",
441
                          self.MODE_TEXT, self._daemon_name, self.node_name,
442
                          message)
443
    else:
444
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
445
                          self.node_name, message)
446

    
447
    self._cbs.ReportFinished(self, self._private)
448

    
449
  def _Finalize(self):
450
    """Makes the RPC call to finalize this import/export.
451

452
    """
453
    return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
454

    
455
  def Finalize(self, error=None):
456
    """Finalizes this import/export.
457

458
    """
459
    if self._daemon_name:
460
      logging.info("Finalizing %s %r on %s",
461
                   self.MODE_TEXT, self._daemon_name, self.node_name)
462

    
463
      result = self._Finalize()
464
      if result.fail_msg:
465
        self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
466
                            self.MODE_TEXT, self._daemon_name,
467
                            self.node_name, result.fail_msg)
468
        return False
469

    
470
      # Daemon is no longer running
471
      self._daemon_name = None
472
      self._ts_cleanup = time.time()
473

    
474
    if error:
475
      self._ReportFinished(False, error)
476

    
477
    return True
478

    
479

    
480
class DiskImport(_DiskImportExportBase):
481
  MODE_TEXT = "import"
482

    
483
  def __init__(self, lu, node_name, opts, instance,
484
               dest, dest_args, timeouts, cbs, private=None):
485
    """Initializes this class.
486

487
    @param lu: Logical unit instance
488
    @type node_name: string
489
    @param node_name: Node name for import
490
    @type opts: L{objects.ImportExportOptions}
491
    @param opts: Import/export daemon options
492
    @type instance: L{objects.Instance}
493
    @param instance: Instance object
494
    @param dest: I/O destination
495
    @param dest_args: I/O arguments
496
    @type timeouts: L{ImportExportTimeouts}
497
    @param timeouts: Timeouts for this import
498
    @type cbs: L{ImportExportCbBase}
499
    @param cbs: Callbacks
500
    @param private: Private data for callback functions
501

502
    """
503
    _DiskImportExportBase.__init__(self, lu, node_name, opts,
504
                                   instance, timeouts, cbs, private)
505
    self._dest = dest
506
    self._dest_args = dest_args
507

    
508
    # Timestamps
509
    self._ts_listening = None
510

    
511
  @property
512
  def listen_port(self):
513
    """Returns the port the daemon is listening on.
514

515
    """
516
    if self._daemon:
517
      return self._daemon.listen_port
518

    
519
    return None
520

    
521
  def _StartDaemon(self):
522
    """Starts the import daemon.
523

524
    """
525
    return self._lu.rpc.call_import_start(self.node_name, self._opts,
526
                                          self._instance,
527
                                          self._dest, self._dest_args)
528

    
529
  def CheckListening(self):
530
    """Checks whether the daemon is listening.
531

532
    @rtype: bool
533
    @return: Whether the daemon is listening
534

535
    """
536
    assert self._daemon, "Daemon status missing"
537

    
538
    if self._ts_listening is not None:
539
      return True
540

    
541
    port = self._daemon.listen_port
542
    if port is not None:
543
      self._ts_listening = time.time()
544

    
545
      logging.debug("Import %r on %s is now listening on port %s",
546
                    self._daemon_name, self.node_name, port)
547

    
548
      self._cbs.ReportListening(self, self._private)
549

    
550
      return True
551

    
552
    if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
553
      raise _ImportExportError("Not listening after %s seconds" %
554
                               self._timeouts.listen)
555

    
556
    return False
557

    
558
  def _GetConnectedCheckEpoch(self):
559
    """Returns the time since we started listening.
560

561
    """
562
    assert self._ts_listening is not None, \
563
           ("Checking whether an import is connected is only useful"
564
            " once it's been listening")
565

    
566
    return self._ts_listening
567

    
568

    
569
class DiskExport(_DiskImportExportBase):
570
  MODE_TEXT = "export"
571

    
572
  def __init__(self, lu, node_name, opts,
573
               dest_host, dest_port, instance, source, source_args,
574
               timeouts, cbs, private=None):
575
    """Initializes this class.
576

577
    @param lu: Logical unit instance
578
    @type node_name: string
579
    @param node_name: Node name for import
580
    @type opts: L{objects.ImportExportOptions}
581
    @param opts: Import/export daemon options
582
    @type dest_host: string
583
    @param dest_host: Destination host name or IP address
584
    @type dest_port: number
585
    @param dest_port: Destination port number
586
    @type instance: L{objects.Instance}
587
    @param instance: Instance object
588
    @param source: I/O source
589
    @param source_args: I/O source
590
    @type timeouts: L{ImportExportTimeouts}
591
    @param timeouts: Timeouts for this import
592
    @type cbs: L{ImportExportCbBase}
593
    @param cbs: Callbacks
594
    @param private: Private data for callback functions
595

596
    """
597
    _DiskImportExportBase.__init__(self, lu, node_name, opts,
598
                                   instance, timeouts, cbs, private)
599
    self._dest_host = dest_host
600
    self._dest_port = dest_port
601
    self._source = source
602
    self._source_args = source_args
603

    
604
  def _StartDaemon(self):
605
    """Starts the export daemon.
606

607
    """
608
    return self._lu.rpc.call_export_start(self.node_name, self._opts,
609
                                          self._dest_host, self._dest_port,
610
                                          self._instance, self._source,
611
                                          self._source_args)
612

    
613
  def CheckListening(self):
614
    """Checks whether the daemon is listening.
615

616
    """
617
    # Only an import can be listening
618
    return True
619

    
620
  def _GetConnectedCheckEpoch(self):
621
    """Returns the time since the daemon started.
622

623
    """
624
    assert self._ts_begin is not None
625

    
626
    return self._ts_begin
627

    
628

    
629
def FormatProgress(progress):
630
  """Formats progress information for user consumption
631

632
  """
633
  (mbytes, throughput, percent, eta) = progress
634

    
635
  parts = [
636
    utils.FormatUnit(mbytes, "h"),
637

    
638
    # Not using FormatUnit as it doesn't support kilobytes
639
    "%0.1f MiB/s" % throughput,
640
    ]
641

    
642
  if percent is not None:
643
    parts.append("%d%%" % percent)
644

    
645
  if eta is not None:
646
    parts.append("ETA %s" % utils.FormatSeconds(eta))
647

    
648
  return utils.CommaJoin(parts)
649

    
650

    
651
class ImportExportLoop:
652
  MIN_DELAY = 1.0
653
  MAX_DELAY = 20.0
654

    
655
  def __init__(self, lu):
656
    """Initializes this class.
657

658
    """
659
    self._lu = lu
660
    self._queue = []
661
    self._pending_add = []
662

    
663
  def Add(self, diskie):
664
    """Adds an import/export object to the loop.
665

666
    @type diskie: Subclass of L{_DiskImportExportBase}
667
    @param diskie: Import/export object
668

669
    """
670
    assert diskie not in self._pending_add
671
    assert diskie.loop is None
672

    
673
    diskie.SetLoop(self)
674

    
675
    # Adding new objects to a staging list is necessary, otherwise the main
676
    # loop gets confused if callbacks modify the queue while the main loop is
677
    # iterating over it.
678
    self._pending_add.append(diskie)
679

    
680
  @staticmethod
681
  def _CollectDaemonStatus(lu, daemons):
682
    """Collects the status for all import/export daemons.
683

684
    """
685
    daemon_status = {}
686

    
687
    for node_name, names in daemons.iteritems():
688
      result = lu.rpc.call_impexp_status(node_name, names)
689
      if result.fail_msg:
690
        lu.LogWarning("Failed to get daemon status on %s: %s",
691
                      node_name, result.fail_msg)
692
        continue
693

    
694
      assert len(names) == len(result.payload)
695

    
696
      daemon_status[node_name] = dict(zip(names, result.payload))
697

    
698
    return daemon_status
699

    
700
  @staticmethod
701
  def _GetActiveDaemonNames(queue):
702
    """Gets the names of all active daemons.
703

704
    """
705
    result = {}
706
    for diskie in queue:
707
      if not diskie.active:
708
        continue
709

    
710
      try:
711
        # Start daemon if necessary
712
        daemon_name = diskie.CheckDaemon()
713
      except _ImportExportError, err:
714
        logging.exception("%s failed", diskie.MODE_TEXT)
715
        diskie.Finalize(error=str(err))
716
        continue
717

    
718
      result.setdefault(diskie.node_name, []).append(daemon_name)
719

    
720
    assert len(queue) >= len(result)
721
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
722

    
723
    logging.debug("daemons=%r", result)
724

    
725
    return result
726

    
727
  def _AddPendingToQueue(self):
728
    """Adds all pending import/export objects to the internal queue.
729

730
    """
731
    assert compat.all(diskie not in self._queue and diskie.loop == self
732
                      for diskie in self._pending_add)
733

    
734
    self._queue.extend(self._pending_add)
735

    
736
    del self._pending_add[:]
737

    
738
  def Run(self):
739
    """Utility main loop.
740

741
    """
742
    while True:
743
      self._AddPendingToQueue()
744

    
745
      # Collect all active daemon names
746
      daemons = self._GetActiveDaemonNames(self._queue)
747
      if not daemons:
748
        break
749

    
750
      # Collection daemon status data
751
      data = self._CollectDaemonStatus(self._lu, daemons)
752

    
753
      # Use data
754
      delay = self.MAX_DELAY
755
      for diskie in self._queue:
756
        if not diskie.active:
757
          continue
758

    
759
        try:
760
          try:
761
            all_daemon_data = data[diskie.node_name]
762
          except KeyError:
763
            result = diskie.SetDaemonData(False, None)
764
          else:
765
            result = \
766
              diskie.SetDaemonData(True,
767
                                   all_daemon_data[diskie.GetDaemonName()])
768

    
769
          if not result:
770
            # Daemon not yet ready, retry soon
771
            delay = min(3.0, delay)
772
            continue
773

    
774
          if diskie.CheckFinished():
775
            # Transfer finished
776
            diskie.Finalize()
777
            continue
778

    
779
          # Normal case: check again in 5 seconds
780
          delay = min(5.0, delay)
781

    
782
          if not diskie.CheckListening():
783
            # Not yet listening, retry soon
784
            delay = min(1.0, delay)
785
            continue
786

    
787
          if not diskie.CheckConnected():
788
            # Not yet connected, retry soon
789
            delay = min(1.0, delay)
790
            continue
791

    
792
        except _ImportExportError, err:
793
          logging.exception("%s failed", diskie.MODE_TEXT)
794
          diskie.Finalize(error=str(err))
795

    
796
      if not compat.any([diskie.active for diskie in self._queue]):
797
        break
798

    
799
      # Wait a bit
800
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
801
      logging.debug("Waiting for %ss", delay)
802
      time.sleep(delay)
803

    
804
  def FinalizeAll(self):
805
    """Finalizes all pending transfers.
806

807
    """
808
    success = True
809

    
810
    for diskie in self._queue:
811
      success = diskie.Finalize() and success
812

    
813
    return success
814

    
815

    
816
class _TransferInstCbBase(ImportExportCbBase):
817
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
818
               dest_node, dest_ip):
819
    """Initializes this class.
820

821
    """
822
    ImportExportCbBase.__init__(self)
823

    
824
    self.lu = lu
825
    self.feedback_fn = feedback_fn
826
    self.instance = instance
827
    self.timeouts = timeouts
828
    self.src_node = src_node
829
    self.src_cbs = src_cbs
830
    self.dest_node = dest_node
831
    self.dest_ip = dest_ip
832

    
833

    
834
class _TransferInstSourceCb(_TransferInstCbBase):
835
  def ReportConnected(self, ie, dtp):
836
    """Called when a connection has been established.
837

838
    """
839
    assert self.src_cbs is None
840
    assert dtp.src_export == ie
841
    assert dtp.dest_import
842

    
843
    self.feedback_fn("%s is sending data on %s" %
844
                     (dtp.data.name, ie.node_name))
845

    
846
  def ReportProgress(self, ie, dtp):
847
    """Called when new progress information should be reported.
848

849
    """
850
    progress = ie.progress
851
    if not progress:
852
      return
853

    
854
    self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
855

    
856
  def ReportFinished(self, ie, dtp):
857
    """Called when a transfer has finished.
858

859
    """
860
    assert self.src_cbs is None
861
    assert dtp.src_export == ie
862
    assert dtp.dest_import
863

    
864
    if ie.success:
865
      self.feedback_fn("%s finished sending data" % dtp.data.name)
866
    else:
867
      self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
868
                       (dtp.data.name, ie.final_message, ie.recent_output))
869

    
870
    dtp.RecordResult(ie.success)
871

    
872
    cb = dtp.data.finished_fn
873
    if cb:
874
      cb()
875

    
876
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
877
    # give the daemon a moment to sort things out
878
    if dtp.dest_import and not ie.success:
879
      dtp.dest_import.Abort()
880

    
881

    
882
class _TransferInstDestCb(_TransferInstCbBase):
883
  def ReportListening(self, ie, dtp):
884
    """Called when daemon started listening.
885

886
    """
887
    assert self.src_cbs
888
    assert dtp.src_export is None
889
    assert dtp.dest_import
890
    assert dtp.export_opts
891

    
892
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
893

    
894
    # Start export on source node
895
    de = DiskExport(self.lu, self.src_node, dtp.export_opts,
896
                    self.dest_ip, ie.listen_port,
897
                    self.instance, dtp.data.src_io, dtp.data.src_ioargs,
898
                    self.timeouts, self.src_cbs, private=dtp)
899
    ie.loop.Add(de)
900

    
901
    dtp.src_export = de
902

    
903
  def ReportConnected(self, ie, dtp):
904
    """Called when a connection has been established.
905

906
    """
907
    self.feedback_fn("%s is receiving data on %s" %
908
                     (dtp.data.name, self.dest_node))
909

    
910
  def ReportFinished(self, ie, dtp):
911
    """Called when a transfer has finished.
912

913
    """
914
    if ie.success:
915
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
916
    else:
917
      self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
918
                       (dtp.data.name, ie.final_message, ie.recent_output))
919

    
920
    dtp.RecordResult(ie.success)
921

    
922
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
923
    # give the daemon a moment to sort things out
924
    if dtp.src_export and not ie.success:
925
      dtp.src_export.Abort()
926

    
927

    
928
class DiskTransfer(object):
929
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
930
               finished_fn):
931
    """Initializes this class.
932

933
    @type name: string
934
    @param name: User-visible name for this transfer (e.g. "disk/0")
935
    @param src_io: Source I/O type
936
    @param src_ioargs: Source I/O arguments
937
    @param dest_io: Destination I/O type
938
    @param dest_ioargs: Destination I/O arguments
939
    @type finished_fn: callable
940
    @param finished_fn: Function called once transfer has finished
941

942
    """
943
    self.name = name
944

    
945
    self.src_io = src_io
946
    self.src_ioargs = src_ioargs
947

    
948
    self.dest_io = dest_io
949
    self.dest_ioargs = dest_ioargs
950

    
951
    self.finished_fn = finished_fn
952

    
953

    
954
class _DiskTransferPrivate(object):
955
  def __init__(self, data, success, export_opts):
956
    """Initializes this class.
957

958
    @type data: L{DiskTransfer}
959
    @type success: bool
960

961
    """
962
    self.data = data
963
    self.success = success
964
    self.export_opts = export_opts
965

    
966
    self.src_export = None
967
    self.dest_import = None
968

    
969
  def RecordResult(self, success):
970
    """Updates the status.
971

972
    One failed part will cause the whole transfer to fail.
973

974
    """
975
    self.success = self.success and success
976

    
977

    
978
def _GetInstDiskMagic(base, instance_name, index):
979
  """Computes the magic value for a disk export or import.
980

981
  @type base: string
982
  @param base: Random seed value (can be the same for all disks of a transfer)
983
  @type instance_name: string
984
  @param instance_name: Name of instance
985
  @type index: number
986
  @param index: Disk index
987

988
  """
989
  h = compat.sha1_hash()
990
  h.update(str(constants.RIE_VERSION))
991
  h.update(base)
992
  h.update(instance_name)
993
  h.update(str(index))
994
  return h.hexdigest()
995

    
996

    
997
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
998
                         instance, all_transfers):
999
  """Transfers an instance's data from one node to another.
1000

1001
  @param lu: Logical unit instance
1002
  @param feedback_fn: Feedback function
1003
  @type src_node: string
1004
  @param src_node: Source node name
1005
  @type dest_node: string
1006
  @param dest_node: Destination node name
1007
  @type dest_ip: string
1008
  @param dest_ip: IP address of destination node
1009
  @type instance: L{objects.Instance}
1010
  @param instance: Instance object
1011
  @type all_transfers: list of L{DiskTransfer} instances
1012
  @param all_transfers: List of all disk transfers to be made
1013
  @rtype: list
1014
  @return: List with a boolean (True=successful, False=failed) for success for
1015
           each transfer
1016

1017
  """
1018
  # Disable compression for all moves as these are all within the same cluster
1019
  compress = constants.IEC_NONE
1020

    
1021
  logging.debug("Source node %s, destination node %s, compression '%s'",
1022
                src_node, dest_node, compress)
1023

    
1024
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1025
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1026
                                  src_node, None, dest_node, dest_ip)
1027
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1028
                                 src_node, src_cbs, dest_node, dest_ip)
1029

    
1030
  all_dtp = []
1031

    
1032
  base_magic = utils.GenerateSecret(6)
1033

    
1034
  ieloop = ImportExportLoop(lu)
1035
  try:
1036
    for idx, transfer in enumerate(all_transfers):
1037
      if transfer:
1038
        feedback_fn("Exporting %s from %s to %s" %
1039
                    (transfer.name, src_node, dest_node))
1040

    
1041
        magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1042
        opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1043
                                           compress=compress, magic=magic)
1044

    
1045
        dtp = _DiskTransferPrivate(transfer, True, opts)
1046

    
1047
        di = DiskImport(lu, dest_node, opts, instance,
1048
                        transfer.dest_io, transfer.dest_ioargs,
1049
                        timeouts, dest_cbs, private=dtp)
1050
        ieloop.Add(di)
1051

    
1052
        dtp.dest_import = di
1053
      else:
1054
        dtp = _DiskTransferPrivate(None, False)
1055

    
1056
      all_dtp.append(dtp)
1057

    
1058
    ieloop.Run()
1059
  finally:
1060
    ieloop.FinalizeAll()
1061

    
1062
  assert len(all_dtp) == len(all_transfers)
1063
  assert compat.all([(dtp.src_export is None or
1064
                      dtp.src_export.success is not None) and
1065
                     (dtp.dest_import is None or
1066
                      dtp.dest_import.success is not None)
1067
                     for dtp in all_dtp]), \
1068
         "Not all imports/exports are finalized"
1069

    
1070
  return [bool(dtp.success) for dtp in all_dtp]
1071

    
1072

    
1073
class _RemoteExportCb(ImportExportCbBase):
1074
  def __init__(self, feedback_fn, disk_count):
1075
    """Initializes this class.
1076

1077
    """
1078
    ImportExportCbBase.__init__(self)
1079
    self._feedback_fn = feedback_fn
1080
    self._dresults = [None] * disk_count
1081

    
1082
  @property
1083
  def disk_results(self):
1084
    """Returns per-disk results.
1085

1086
    """
1087
    return self._dresults
1088

    
1089
  def ReportConnected(self, ie, private):
1090
    """Called when a connection has been established.
1091

1092
    """
1093
    (idx, _) = private
1094

    
1095
    self._feedback_fn("Disk %s is now sending data" % idx)
1096

    
1097
  def ReportProgress(self, ie, private):
1098
    """Called when new progress information should be reported.
1099

1100
    """
1101
    (idx, _) = private
1102

    
1103
    progress = ie.progress
1104
    if not progress:
1105
      return
1106

    
1107
    self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1108

    
1109
  def ReportFinished(self, ie, private):
1110
    """Called when a transfer has finished.
1111

1112
    """
1113
    (idx, finished_fn) = private
1114

    
1115
    if ie.success:
1116
      self._feedback_fn("Disk %s finished sending data" % idx)
1117
    else:
1118
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" %
1119
                        (idx, ie.final_message, ie.recent_output))
1120

    
1121
    self._dresults[idx] = bool(ie.success)
1122

    
1123
    if finished_fn:
1124
      finished_fn()
1125

    
1126

    
1127
class ExportInstanceHelper:
1128
  def __init__(self, lu, feedback_fn, instance):
1129
    """Initializes this class.
1130

1131
    @param lu: Logical unit instance
1132
    @param feedback_fn: Feedback function
1133
    @type instance: L{objects.Instance}
1134
    @param instance: Instance object
1135

1136
    """
1137
    self._lu = lu
1138
    self._feedback_fn = feedback_fn
1139
    self._instance = instance
1140

    
1141
    self._snap_disks = []
1142
    self._removed_snaps = [False] * len(instance.disks)
1143

    
1144
  def CreateSnapshots(self):
1145
    """Creates an LVM snapshot for every disk of the instance.
1146

1147
    """
1148
    assert not self._snap_disks
1149

    
1150
    instance = self._instance
1151
    src_node = instance.primary_node
1152

    
1153
    vgname = self._lu.cfg.GetVGName()
1154

    
1155
    for idx, disk in enumerate(instance.disks):
1156
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1157
                        (idx, src_node))
1158

    
1159
      # result.payload will be a snapshot of an lvm leaf of the one we
1160
      # passed
1161
      result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1162
      msg = result.fail_msg
1163
      if msg:
1164
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1165
                            idx, src_node, msg)
1166
        new_dev = False
1167
      else:
1168
        disk_id = (vgname, result.payload)
1169
        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1170
                               logical_id=disk_id, physical_id=disk_id,
1171
                               iv_name=disk.iv_name)
1172

    
1173
      self._snap_disks.append(new_dev)
1174

    
1175
    assert len(self._snap_disks) == len(instance.disks)
1176
    assert len(self._removed_snaps) == len(instance.disks)
1177

    
1178
  def _RemoveSnapshot(self, disk_index):
1179
    """Removes an LVM snapshot.
1180

1181
    @type disk_index: number
1182
    @param disk_index: Index of the snapshot to be removed
1183

1184
    """
1185
    disk = self._snap_disks[disk_index]
1186
    if disk and not self._removed_snaps[disk_index]:
1187
      src_node = self._instance.primary_node
1188

    
1189
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1190
                        (disk_index, src_node))
1191

    
1192
      result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1193
      if result.fail_msg:
1194
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1195
                            " %s: %s", disk_index, src_node, result.fail_msg)
1196
      else:
1197
        self._removed_snaps[disk_index] = True
1198

    
1199
  def LocalExport(self, dest_node):
1200
    """Intra-cluster instance export.
1201

1202
    @type dest_node: L{objects.Node}
1203
    @param dest_node: Destination node
1204

1205
    """
1206
    instance = self._instance
1207
    src_node = instance.primary_node
1208

    
1209
    assert len(self._snap_disks) == len(instance.disks)
1210

    
1211
    transfers = []
1212

    
1213
    for idx, dev in enumerate(self._snap_disks):
1214
      if not dev:
1215
        transfers.append(None)
1216
        continue
1217

    
1218
      path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1219
                            dev.physical_id[1])
1220

    
1221
      finished_fn = compat.partial(self._TransferFinished, idx)
1222

    
1223
      # FIXME: pass debug option from opcode to backend
1224
      dt = DiskTransfer("snapshot/%s" % idx,
1225
                        constants.IEIO_SCRIPT, (dev, idx),
1226
                        constants.IEIO_FILE, (path, ),
1227
                        finished_fn)
1228
      transfers.append(dt)
1229

    
1230
    # Actually export data
1231
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1232
                                    src_node, dest_node.name,
1233
                                    dest_node.secondary_ip,
1234
                                    instance, transfers)
1235

    
1236
    assert len(dresults) == len(instance.disks)
1237

    
1238
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1239
    result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1240
                                               self._snap_disks)
1241
    msg = result.fail_msg
1242
    fin_resu = not msg
1243
    if msg:
1244
      self._lu.LogWarning("Could not finalize export for instance %s"
1245
                          " on node %s: %s", instance.name, dest_node.name, msg)
1246

    
1247
    return (fin_resu, dresults)
1248

    
1249
  def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1250
    """Inter-cluster instance export.
1251

1252
    @type disk_info: list
1253
    @param disk_info: Per-disk destination information
1254
    @type key_name: string
1255
    @param key_name: Name of X509 key to use
1256
    @type dest_ca_pem: string
1257
    @param dest_ca_pem: Destination X509 CA in PEM format
1258
    @type timeouts: L{ImportExportTimeouts}
1259
    @param timeouts: Timeouts for this import
1260

1261
    """
1262
    instance = self._instance
1263

    
1264
    assert len(disk_info) == len(instance.disks)
1265

    
1266
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1267

    
1268
    ieloop = ImportExportLoop(self._lu)
1269
    try:
1270
      for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1271
                                                           disk_info)):
1272
        opts = objects.ImportExportOptions(key_name=key_name,
1273
                                           ca_pem=dest_ca_pem,
1274
                                           magic=magic)
1275

    
1276
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1277
        finished_fn = compat.partial(self._TransferFinished, idx)
1278
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1279
                              opts, host, port, instance,
1280
                              constants.IEIO_SCRIPT, (dev, idx),
1281
                              timeouts, cbs, private=(idx, finished_fn)))
1282

    
1283
      ieloop.Run()
1284
    finally:
1285
      ieloop.FinalizeAll()
1286

    
1287
    return (True, cbs.disk_results)
1288

    
1289
  def _TransferFinished(self, idx):
1290
    """Called once a transfer has finished.
1291

1292
    @type idx: number
1293
    @param idx: Disk index
1294

1295
    """
1296
    logging.debug("Transfer %s finished", idx)
1297
    self._RemoveSnapshot(idx)
1298

    
1299
  def Cleanup(self):
1300
    """Remove all snapshots.
1301

1302
    """
1303
    assert len(self._removed_snaps) == len(self._instance.disks)
1304
    for idx in range(len(self._instance.disks)):
1305
      self._RemoveSnapshot(idx)
1306

    
1307

    
1308
class _RemoteImportCb(ImportExportCbBase):
1309
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1310
               external_address):
1311
    """Initializes this class.
1312

1313
    @type cds: string
1314
    @param cds: Cluster domain secret
1315
    @type x509_cert_pem: string
1316
    @param x509_cert_pem: CA used for signing import key
1317
    @type disk_count: number
1318
    @param disk_count: Number of disks
1319
    @type external_address: string
1320
    @param external_address: External address of destination node
1321

1322
    """
1323
    ImportExportCbBase.__init__(self)
1324
    self._feedback_fn = feedback_fn
1325
    self._cds = cds
1326
    self._x509_cert_pem = x509_cert_pem
1327
    self._disk_count = disk_count
1328
    self._external_address = external_address
1329

    
1330
    self._dresults = [None] * disk_count
1331
    self._daemon_port = [None] * disk_count
1332

    
1333
    self._salt = utils.GenerateSecret(8)
1334

    
1335
  @property
1336
  def disk_results(self):
1337
    """Returns per-disk results.
1338

1339
    """
1340
    return self._dresults
1341

    
1342
  def _CheckAllListening(self):
1343
    """Checks whether all daemons are listening.
1344

1345
    If all daemons are listening, the information is sent to the client.
1346

1347
    """
1348
    if not compat.all(dp is not None for dp in self._daemon_port):
1349
      return
1350

    
1351
    host = self._external_address
1352

    
1353
    disks = []
1354
    for idx, (port, magic) in enumerate(self._daemon_port):
1355
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1356
                                               idx, host, port, magic))
1357

    
1358
    assert len(disks) == self._disk_count
1359

    
1360
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1361
      "disks": disks,
1362
      "x509_ca": self._x509_cert_pem,
1363
      })
1364

    
1365
  def ReportListening(self, ie, private):
1366
    """Called when daemon started listening.
1367

1368
    """
1369
    (idx, ) = private
1370

    
1371
    self._feedback_fn("Disk %s is now listening" % idx)
1372

    
1373
    assert self._daemon_port[idx] is None
1374

    
1375
    self._daemon_port[idx] = (ie.listen_port, ie.magic)
1376

    
1377
    self._CheckAllListening()
1378

    
1379
  def ReportConnected(self, ie, private):
1380
    """Called when a connection has been established.
1381

1382
    """
1383
    (idx, ) = private
1384

    
1385
    self._feedback_fn("Disk %s is now receiving data" % idx)
1386

    
1387
  def ReportFinished(self, ie, private):
1388
    """Called when a transfer has finished.
1389

1390
    """
1391
    (idx, ) = private
1392

    
1393
    # Daemon is certainly no longer listening
1394
    self._daemon_port[idx] = None
1395

    
1396
    if ie.success:
1397
      self._feedback_fn("Disk %s finished receiving data" % idx)
1398
    else:
1399
      self._feedback_fn(("Disk %s failed to receive data: %s"
1400
                         " (recent output: %r)") %
1401
                        (idx, ie.final_message, ie.recent_output))
1402

    
1403
    self._dresults[idx] = bool(ie.success)
1404

    
1405

    
1406
def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts):
1407
  """Imports an instance from another cluster.
1408

1409
  @param lu: Logical unit instance
1410
  @param feedback_fn: Feedback function
1411
  @type instance: L{objects.Instance}
1412
  @param instance: Instance object
1413
  @type source_x509_ca: OpenSSL.crypto.X509
1414
  @param source_x509_ca: Import source's X509 CA
1415
  @type cds: string
1416
  @param cds: Cluster domain secret
1417
  @type timeouts: L{ImportExportTimeouts}
1418
  @param timeouts: Timeouts for this import
1419

1420
  """
1421
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1422
                                                  source_x509_ca)
1423

    
1424
  magic_base = utils.GenerateSecret(6)
1425

    
1426
  # Create crypto key
1427
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1428
                                        constants.RIE_CERT_VALIDITY)
1429
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1430

    
1431
  (x509_key_name, x509_cert_pem) = result.payload
1432
  try:
1433
    # Load certificate
1434
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1435
                                                x509_cert_pem)
1436

    
1437
    # Sign certificate
1438
    signed_x509_cert_pem = \
1439
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1440

    
1441
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1442
                          len(instance.disks), instance.primary_node)
1443

    
1444
    ieloop = ImportExportLoop(lu)
1445
    try:
1446
      for idx, dev in enumerate(instance.disks):
1447
        magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1448

    
1449
        # Import daemon options
1450
        opts = objects.ImportExportOptions(key_name=x509_key_name,
1451
                                           ca_pem=source_ca_pem,
1452
                                           magic=magic)
1453

    
1454
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1455
                              constants.IEIO_SCRIPT, (dev, idx),
1456
                              timeouts, cbs, private=(idx, )))
1457

    
1458
      ieloop.Run()
1459
    finally:
1460
      ieloop.FinalizeAll()
1461
  finally:
1462
    # Remove crypto key and certificate
1463
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1464
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1465

    
1466
  return cbs.disk_results
1467

    
1468

    
1469
def _GetImportExportHandshakeMessage(version):
1470
  """Returns the handshake message for a RIE protocol version.
1471

1472
  @type version: number
1473

1474
  """
1475
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1476

    
1477

    
1478
def ComputeRemoteExportHandshake(cds):
1479
  """Computes the remote import/export handshake.
1480

1481
  @type cds: string
1482
  @param cds: Cluster domain secret
1483

1484
  """
1485
  salt = utils.GenerateSecret(8)
1486
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1487
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1488

    
1489

    
1490
def CheckRemoteExportHandshake(cds, handshake):
1491
  """Checks the handshake of a remote import/export.
1492

1493
  @type cds: string
1494
  @param cds: Cluster domain secret
1495
  @type handshake: sequence
1496
  @param handshake: Handshake sent by remote peer
1497

1498
  """
1499
  try:
1500
    (version, hmac_digest, hmac_salt) = handshake
1501
  except (TypeError, ValueError), err:
1502
    return "Invalid data: %s" % err
1503

    
1504
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1505
                              hmac_digest, salt=hmac_salt):
1506
    return "Hash didn't match, clusters don't share the same domain secret"
1507

    
1508
  if version != constants.RIE_VERSION:
1509
    return ("Clusters don't have the same remote import/export protocol"
1510
            " (local=%s, remote=%s)" %
1511
            (constants.RIE_VERSION, version))
1512

    
1513
  return None
1514

    
1515

    
1516
def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1517
  """Returns the hashed text for import/export disk information.
1518

1519
  @type disk_index: number
1520
  @param disk_index: Index of disk (included in hash)
1521
  @type host: string
1522
  @param host: Hostname
1523
  @type port: number
1524
  @param port: Daemon port
1525
  @type magic: string
1526
  @param magic: Magic value
1527

1528
  """
1529
  return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1530

    
1531

    
1532
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1533
  """Verifies received disk information for an export.
1534

1535
  @type cds: string
1536
  @param cds: Cluster domain secret
1537
  @type disk_index: number
1538
  @param disk_index: Index of disk (included in hash)
1539
  @type disk_info: sequence
1540
  @param disk_info: Disk information sent by remote peer
1541

1542
  """
1543
  try:
1544
    (host, port, magic, hmac_digest, hmac_salt) = disk_info
1545
  except (TypeError, ValueError), err:
1546
    raise errors.GenericError("Invalid data: %s" % err)
1547

    
1548
  if not (host and port and magic):
1549
    raise errors.GenericError("Missing destination host, port or magic")
1550

    
1551
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1552

    
1553
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1554
    raise errors.GenericError("HMAC is wrong")
1555

    
1556
  return (utils.HostInfo.NormalizeName(host),
1557
          utils.ValidateServiceName(port),
1558
          magic)
1559

    
1560

    
1561
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1562
  """Computes the signed disk information for a remote import.
1563

1564
  @type cds: string
1565
  @param cds: Cluster domain secret
1566
  @type salt: string
1567
  @param salt: HMAC salt
1568
  @type disk_index: number
1569
  @param disk_index: Index of disk (included in hash)
1570
  @type host: string
1571
  @param host: Hostname
1572
  @type port: number
1573
  @param port: Daemon port
1574
  @type magic: string
1575
  @param magic: Magic value
1576

1577
  """
1578
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1579
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1580
  return (host, port, magic, hmac_digest, salt)