Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ 800ac399

History | View | Annotate | Download (44 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Instance-related functions and classes for masterd.
23

24
"""
25

    
26
import logging
27
import time
28
import OpenSSL
29

    
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import compat
33
from ganeti import utils
34
from ganeti import objects
35
from ganeti import netutils
36

    
37

    
38
class _ImportExportError(Exception):
39
  """Local exception to report import/export errors.
40

41
  """
42

    
43

    
44
class ImportExportTimeouts(object):
45
  #: Time until daemon starts writing status file
46
  DEFAULT_READY_TIMEOUT = 10
47

    
48
  #: Length of time until errors cause hard failure
49
  DEFAULT_ERROR_TIMEOUT = 10
50

    
51
  #: Time after which daemon must be listening
52
  DEFAULT_LISTEN_TIMEOUT = 10
53

    
54
  #: Progress update interval
55
  DEFAULT_PROGRESS_INTERVAL = 60
56

    
57
  __slots__ = [
58
    "error",
59
    "ready",
60
    "listen",
61
    "connect",
62
    "progress",
63
    ]
64

    
65
  def __init__(self, connect,
66
               listen=DEFAULT_LISTEN_TIMEOUT,
67
               error=DEFAULT_ERROR_TIMEOUT,
68
               ready=DEFAULT_READY_TIMEOUT,
69
               progress=DEFAULT_PROGRESS_INTERVAL):
70
    """Initializes this class.
71

72
    @type connect: number
73
    @param connect: Timeout for establishing connection
74
    @type listen: number
75
    @param listen: Timeout for starting to listen for connections
76
    @type error: number
77
    @param error: Length of time until errors cause hard failure
78
    @type ready: number
79
    @param ready: Timeout for daemon to become ready
80
    @type progress: number
81
    @param progress: Progress update interval
82

83
    """
84
    self.error = error
85
    self.ready = ready
86
    self.listen = listen
87
    self.connect = connect
88
    self.progress = progress
89

    
90

    
91
class ImportExportCbBase(object):
92
  """Callbacks for disk import/export.
93

94
  """
95
  def ReportListening(self, ie, private):
96
    """Called when daemon started listening.
97

98
    @type ie: Subclass of L{_DiskImportExportBase}
99
    @param ie: Import/export object
100
    @param private: Private data passed to import/export object
101

102
    """
103

    
104
  def ReportConnected(self, ie, private):
105
    """Called when a connection has been established.
106

107
    @type ie: Subclass of L{_DiskImportExportBase}
108
    @param ie: Import/export object
109
    @param private: Private data passed to import/export object
110

111
    """
112

    
113
  def ReportProgress(self, ie, private):
114
    """Called when new progress information should be reported.
115

116
    @type ie: Subclass of L{_DiskImportExportBase}
117
    @param ie: Import/export object
118
    @param private: Private data passed to import/export object
119

120
    """
121

    
122
  def ReportFinished(self, ie, private):
123
    """Called when a transfer has finished.
124

125
    @type ie: Subclass of L{_DiskImportExportBase}
126
    @param ie: Import/export object
127
    @param private: Private data passed to import/export object
128

129
    """
130

    
131

    
132
def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
133
  """Checks whether a timeout has expired.
134

135
  """
136
  return _time_fn() > (epoch + timeout)
137

    
138

    
139
class _DiskImportExportBase(object):
140
  MODE_TEXT = None
141

    
142
  def __init__(self, lu, node_name, opts,
143
               instance, timeouts, cbs, private=None):
144
    """Initializes this class.
145

146
    @param lu: Logical unit instance
147
    @type node_name: string
148
    @param node_name: Node name for import
149
    @type opts: L{objects.ImportExportOptions}
150
    @param opts: Import/export daemon options
151
    @type instance: L{objects.Instance}
152
    @param instance: Instance object
153
    @type timeouts: L{ImportExportTimeouts}
154
    @param timeouts: Timeouts for this import
155
    @type cbs: L{ImportExportCbBase}
156
    @param cbs: Callbacks
157
    @param private: Private data for callback functions
158

159
    """
160
    assert self.MODE_TEXT
161

    
162
    self._lu = lu
163
    self.node_name = node_name
164
    self._opts = opts
165
    self._instance = instance
166
    self._timeouts = timeouts
167
    self._cbs = cbs
168
    self._private = private
169

    
170
    # Parent loop
171
    self._loop = None
172

    
173
    # Timestamps
174
    self._ts_begin = None
175
    self._ts_connected = None
176
    self._ts_finished = None
177
    self._ts_cleanup = None
178
    self._ts_last_progress = None
179
    self._ts_last_error = None
180

    
181
    # Transfer status
182
    self.success = None
183
    self.final_message = None
184

    
185
    # Daemon status
186
    self._daemon_name = None
187
    self._daemon = None
188

    
189
  @property
190
  def recent_output(self):
191
    """Returns the most recent output from the daemon.
192

193
    """
194
    if self._daemon:
195
      return self._daemon.recent_output
196

    
197
    return None
198

    
199
  @property
200
  def progress(self):
201
    """Returns transfer progress information.
202

203
    """
204
    if not self._daemon:
205
      return None
206

    
207
    return (self._daemon.progress_mbytes,
208
            self._daemon.progress_throughput,
209
            self._daemon.progress_percent,
210
            self._daemon.progress_eta)
211

    
212
  @property
213
  def magic(self):
214
    """Returns the magic value for this import/export.
215

216
    """
217
    return self._opts.magic
218

    
219
  @property
220
  def active(self):
221
    """Determines whether this transport is still active.
222

223
    """
224
    return self.success is None
225

    
226
  @property
227
  def loop(self):
228
    """Returns parent loop.
229

230
    @rtype: L{ImportExportLoop}
231

232
    """
233
    return self._loop
234

    
235
  def SetLoop(self, loop):
236
    """Sets the parent loop.
237

238
    @type loop: L{ImportExportLoop}
239

240
    """
241
    if self._loop:
242
      raise errors.ProgrammerError("Loop can only be set once")
243

    
244
    self._loop = loop
245

    
246
  def _StartDaemon(self):
247
    """Starts the import/export daemon.
248

249
    """
250
    raise NotImplementedError()
251

    
252
  def CheckDaemon(self):
253
    """Checks whether daemon has been started and if not, starts it.
254

255
    @rtype: string
256
    @return: Daemon name
257

258
    """
259
    assert self._ts_cleanup is None
260

    
261
    if self._daemon_name is None:
262
      assert self._ts_begin is None
263

    
264
      result = self._StartDaemon()
265
      if result.fail_msg:
266
        raise _ImportExportError("Failed to start %s on %s: %s" %
267
                                 (self.MODE_TEXT, self.node_name,
268
                                  result.fail_msg))
269

    
270
      daemon_name = result.payload
271

    
272
      logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
273
                   self.node_name)
274

    
275
      self._ts_begin = time.time()
276
      self._daemon_name = daemon_name
277

    
278
    return self._daemon_name
279

    
280
  def GetDaemonName(self):
281
    """Returns the daemon name.
282

283
    """
284
    assert self._daemon_name, "Daemon has not been started"
285
    assert self._ts_cleanup is None
286
    return self._daemon_name
287

    
288
  def Abort(self):
289
    """Sends SIGTERM to import/export daemon (if still active).
290

291
    """
292
    if self._daemon_name:
293
      self._lu.LogWarning("Aborting %s %r on %s",
294
                          self.MODE_TEXT, self._daemon_name, self.node_name)
295
      result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
296
      if result.fail_msg:
297
        self._lu.LogWarning("Failed to abort %s %r on %s: %s",
298
                            self.MODE_TEXT, self._daemon_name,
299
                            self.node_name, result.fail_msg)
300
        return False
301

    
302
    return True
303

    
304
  def _SetDaemonData(self, data):
305
    """Internal function for updating status daemon data.
306

307
    @type data: L{objects.ImportExportStatus}
308
    @param data: Daemon status data
309

310
    """
311
    assert self._ts_begin is not None
312

    
313
    if not data:
314
      if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
315
        raise _ImportExportError("Didn't become ready after %s seconds" %
316
                                 self._timeouts.ready)
317

    
318
      return False
319

    
320
    self._daemon = data
321

    
322
    return True
323

    
324
  def SetDaemonData(self, success, data):
325
    """Updates daemon status data.
326

327
    @type success: bool
328
    @param success: Whether fetching data was successful or not
329
    @type data: L{objects.ImportExportStatus}
330
    @param data: Daemon status data
331

332
    """
333
    if not success:
334
      if self._ts_last_error is None:
335
        self._ts_last_error = time.time()
336

    
337
      elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
338
        raise _ImportExportError("Too many errors while updating data")
339

    
340
      return False
341

    
342
    self._ts_last_error = None
343

    
344
    return self._SetDaemonData(data)
345

    
346
  def CheckListening(self):
347
    """Checks whether the daemon is listening.
348

349
    """
350
    raise NotImplementedError()
351

    
352
  def _GetConnectedCheckEpoch(self):
353
    """Returns timeout to calculate connect timeout.
354

355
    """
356
    raise NotImplementedError()
357

    
358
  def CheckConnected(self):
359
    """Checks whether the daemon is connected.
360

361
    @rtype: bool
362
    @return: Whether the daemon is connected
363

364
    """
365
    assert self._daemon, "Daemon status missing"
366

    
367
    if self._ts_connected is not None:
368
      return True
369

    
370
    if self._daemon.connected:
371
      self._ts_connected = time.time()
372

    
373
      # TODO: Log remote peer
374
      logging.debug("%s %r on %s is now connected",
375
                    self.MODE_TEXT, self._daemon_name, self.node_name)
376

    
377
      self._cbs.ReportConnected(self, self._private)
378

    
379
      return True
380

    
381
    if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
382
      raise _ImportExportError("Not connected after %s seconds" %
383
                               self._timeouts.connect)
384

    
385
    return False
386

    
387
  def _CheckProgress(self):
388
    """Checks whether a progress update should be reported.
389

390
    """
391
    if ((self._ts_last_progress is None or
392
         _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
393
        self._daemon and
394
        self._daemon.progress_mbytes is not None and
395
        self._daemon.progress_throughput is not None):
396
      self._cbs.ReportProgress(self, self._private)
397
      self._ts_last_progress = time.time()
398

    
399
  def CheckFinished(self):
400
    """Checks whether the daemon exited.
401

402
    @rtype: bool
403
    @return: Whether the transfer is finished
404

405
    """
406
    assert self._daemon, "Daemon status missing"
407

    
408
    if self._ts_finished:
409
      return True
410

    
411
    if self._daemon.exit_status is None:
412
      # TODO: Adjust delay for ETA expiring soon
413
      self._CheckProgress()
414
      return False
415

    
416
    self._ts_finished = time.time()
417

    
418
    self._ReportFinished(self._daemon.exit_status == 0,
419
                         self._daemon.error_message)
420

    
421
    return True
422

    
423
  def _ReportFinished(self, success, message):
424
    """Transfer is finished or daemon exited.
425

426
    @type success: bool
427
    @param success: Whether the transfer was successful
428
    @type message: string
429
    @param message: Error message
430

431
    """
432
    assert self.success is None
433

    
434
    self.success = success
435
    self.final_message = message
436

    
437
    if success:
438
      logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
439
                   self.node_name)
440
    elif self._daemon_name:
441
      self._lu.LogWarning("%s %r on %s failed: %s",
442
                          self.MODE_TEXT, self._daemon_name, self.node_name,
443
                          message)
444
    else:
445
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
446
                          self.node_name, message)
447

    
448
    self._cbs.ReportFinished(self, self._private)
449

    
450
  def _Finalize(self):
451
    """Makes the RPC call to finalize this import/export.
452

453
    """
454
    return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
455

    
456
  def Finalize(self, error=None):
457
    """Finalizes this import/export.
458

459
    """
460
    if self._daemon_name:
461
      logging.info("Finalizing %s %r on %s",
462
                   self.MODE_TEXT, self._daemon_name, self.node_name)
463

    
464
      result = self._Finalize()
465
      if result.fail_msg:
466
        self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
467
                            self.MODE_TEXT, self._daemon_name,
468
                            self.node_name, result.fail_msg)
469
        return False
470

    
471
      # Daemon is no longer running
472
      self._daemon_name = None
473
      self._ts_cleanup = time.time()
474

    
475
    if error:
476
      self._ReportFinished(False, error)
477

    
478
    return True
479

    
480

    
481
class DiskImport(_DiskImportExportBase):
482
  MODE_TEXT = "import"
483

    
484
  def __init__(self, lu, node_name, opts, instance,
485
               dest, dest_args, timeouts, cbs, private=None):
486
    """Initializes this class.
487

488
    @param lu: Logical unit instance
489
    @type node_name: string
490
    @param node_name: Node name for import
491
    @type opts: L{objects.ImportExportOptions}
492
    @param opts: Import/export daemon options
493
    @type instance: L{objects.Instance}
494
    @param instance: Instance object
495
    @param dest: I/O destination
496
    @param dest_args: I/O arguments
497
    @type timeouts: L{ImportExportTimeouts}
498
    @param timeouts: Timeouts for this import
499
    @type cbs: L{ImportExportCbBase}
500
    @param cbs: Callbacks
501
    @param private: Private data for callback functions
502

503
    """
504
    _DiskImportExportBase.__init__(self, lu, node_name, opts,
505
                                   instance, timeouts, cbs, private)
506
    self._dest = dest
507
    self._dest_args = dest_args
508

    
509
    # Timestamps
510
    self._ts_listening = None
511

    
512
  @property
513
  def listen_port(self):
514
    """Returns the port the daemon is listening on.
515

516
    """
517
    if self._daemon:
518
      return self._daemon.listen_port
519

    
520
    return None
521

    
522
  def _StartDaemon(self):
523
    """Starts the import daemon.
524

525
    """
526
    return self._lu.rpc.call_import_start(self.node_name, self._opts,
527
                                          self._instance,
528
                                          self._dest, self._dest_args)
529

    
530
  def CheckListening(self):
531
    """Checks whether the daemon is listening.
532

533
    @rtype: bool
534
    @return: Whether the daemon is listening
535

536
    """
537
    assert self._daemon, "Daemon status missing"
538

    
539
    if self._ts_listening is not None:
540
      return True
541

    
542
    port = self._daemon.listen_port
543
    if port is not None:
544
      self._ts_listening = time.time()
545

    
546
      logging.debug("Import %r on %s is now listening on port %s",
547
                    self._daemon_name, self.node_name, port)
548

    
549
      self._cbs.ReportListening(self, self._private)
550

    
551
      return True
552

    
553
    if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
554
      raise _ImportExportError("Not listening after %s seconds" %
555
                               self._timeouts.listen)
556

    
557
    return False
558

    
559
  def _GetConnectedCheckEpoch(self):
560
    """Returns the time since we started listening.
561

562
    """
563
    assert self._ts_listening is not None, \
564
           ("Checking whether an import is connected is only useful"
565
            " once it's been listening")
566

    
567
    return self._ts_listening
568

    
569

    
570
class DiskExport(_DiskImportExportBase):
571
  MODE_TEXT = "export"
572

    
573
  def __init__(self, lu, node_name, opts,
574
               dest_host, dest_port, instance, source, source_args,
575
               timeouts, cbs, private=None):
576
    """Initializes this class.
577

578
    @param lu: Logical unit instance
579
    @type node_name: string
580
    @param node_name: Node name for import
581
    @type opts: L{objects.ImportExportOptions}
582
    @param opts: Import/export daemon options
583
    @type dest_host: string
584
    @param dest_host: Destination host name or IP address
585
    @type dest_port: number
586
    @param dest_port: Destination port number
587
    @type instance: L{objects.Instance}
588
    @param instance: Instance object
589
    @param source: I/O source
590
    @param source_args: I/O source
591
    @type timeouts: L{ImportExportTimeouts}
592
    @param timeouts: Timeouts for this import
593
    @type cbs: L{ImportExportCbBase}
594
    @param cbs: Callbacks
595
    @param private: Private data for callback functions
596

597
    """
598
    _DiskImportExportBase.__init__(self, lu, node_name, opts,
599
                                   instance, timeouts, cbs, private)
600
    self._dest_host = dest_host
601
    self._dest_port = dest_port
602
    self._source = source
603
    self._source_args = source_args
604

    
605
  def _StartDaemon(self):
606
    """Starts the export daemon.
607

608
    """
609
    return self._lu.rpc.call_export_start(self.node_name, self._opts,
610
                                          self._dest_host, self._dest_port,
611
                                          self._instance, self._source,
612
                                          self._source_args)
613

    
614
  def CheckListening(self):
615
    """Checks whether the daemon is listening.
616

617
    """
618
    # Only an import can be listening
619
    return True
620

    
621
  def _GetConnectedCheckEpoch(self):
622
    """Returns the time since the daemon started.
623

624
    """
625
    assert self._ts_begin is not None
626

    
627
    return self._ts_begin
628

    
629

    
630
def FormatProgress(progress):
631
  """Formats progress information for user consumption
632

633
  """
634
  (mbytes, throughput, percent, eta) = progress
635

    
636
  parts = [
637
    utils.FormatUnit(mbytes, "h"),
638

    
639
    # Not using FormatUnit as it doesn't support kilobytes
640
    "%0.1f MiB/s" % throughput,
641
    ]
642

    
643
  if percent is not None:
644
    parts.append("%d%%" % percent)
645

    
646
  if eta is not None:
647
    parts.append("ETA %s" % utils.FormatSeconds(eta))
648

    
649
  return utils.CommaJoin(parts)
650

    
651

    
652
class ImportExportLoop:
653
  MIN_DELAY = 1.0
654
  MAX_DELAY = 20.0
655

    
656
  def __init__(self, lu):
657
    """Initializes this class.
658

659
    """
660
    self._lu = lu
661
    self._queue = []
662
    self._pending_add = []
663

    
664
  def Add(self, diskie):
665
    """Adds an import/export object to the loop.
666

667
    @type diskie: Subclass of L{_DiskImportExportBase}
668
    @param diskie: Import/export object
669

670
    """
671
    assert diskie not in self._pending_add
672
    assert diskie.loop is None
673

    
674
    diskie.SetLoop(self)
675

    
676
    # Adding new objects to a staging list is necessary, otherwise the main
677
    # loop gets confused if callbacks modify the queue while the main loop is
678
    # iterating over it.
679
    self._pending_add.append(diskie)
680

    
681
  @staticmethod
682
  def _CollectDaemonStatus(lu, daemons):
683
    """Collects the status for all import/export daemons.
684

685
    """
686
    daemon_status = {}
687

    
688
    for node_name, names in daemons.iteritems():
689
      result = lu.rpc.call_impexp_status(node_name, names)
690
      if result.fail_msg:
691
        lu.LogWarning("Failed to get daemon status on %s: %s",
692
                      node_name, result.fail_msg)
693
        continue
694

    
695
      assert len(names) == len(result.payload)
696

    
697
      daemon_status[node_name] = dict(zip(names, result.payload))
698

    
699
    return daemon_status
700

    
701
  @staticmethod
702
  def _GetActiveDaemonNames(queue):
703
    """Gets the names of all active daemons.
704

705
    """
706
    result = {}
707
    for diskie in queue:
708
      if not diskie.active:
709
        continue
710

    
711
      try:
712
        # Start daemon if necessary
713
        daemon_name = diskie.CheckDaemon()
714
      except _ImportExportError, err:
715
        logging.exception("%s failed", diskie.MODE_TEXT)
716
        diskie.Finalize(error=str(err))
717
        continue
718

    
719
      result.setdefault(diskie.node_name, []).append(daemon_name)
720

    
721
    assert len(queue) >= len(result)
722
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
723

    
724
    logging.debug("daemons=%r", result)
725

    
726
    return result
727

    
728
  def _AddPendingToQueue(self):
729
    """Adds all pending import/export objects to the internal queue.
730

731
    """
732
    assert compat.all(diskie not in self._queue and diskie.loop == self
733
                      for diskie in self._pending_add)
734

    
735
    self._queue.extend(self._pending_add)
736

    
737
    del self._pending_add[:]
738

    
739
  def Run(self):
740
    """Utility main loop.
741

742
    """
743
    while True:
744
      self._AddPendingToQueue()
745

    
746
      # Collect all active daemon names
747
      daemons = self._GetActiveDaemonNames(self._queue)
748
      if not daemons:
749
        break
750

    
751
      # Collection daemon status data
752
      data = self._CollectDaemonStatus(self._lu, daemons)
753

    
754
      # Use data
755
      delay = self.MAX_DELAY
756
      for diskie in self._queue:
757
        if not diskie.active:
758
          continue
759

    
760
        try:
761
          try:
762
            all_daemon_data = data[diskie.node_name]
763
          except KeyError:
764
            result = diskie.SetDaemonData(False, None)
765
          else:
766
            result = \
767
              diskie.SetDaemonData(True,
768
                                   all_daemon_data[diskie.GetDaemonName()])
769

    
770
          if not result:
771
            # Daemon not yet ready, retry soon
772
            delay = min(3.0, delay)
773
            continue
774

    
775
          if diskie.CheckFinished():
776
            # Transfer finished
777
            diskie.Finalize()
778
            continue
779

    
780
          # Normal case: check again in 5 seconds
781
          delay = min(5.0, delay)
782

    
783
          if not diskie.CheckListening():
784
            # Not yet listening, retry soon
785
            delay = min(1.0, delay)
786
            continue
787

    
788
          if not diskie.CheckConnected():
789
            # Not yet connected, retry soon
790
            delay = min(1.0, delay)
791
            continue
792

    
793
        except _ImportExportError, err:
794
          logging.exception("%s failed", diskie.MODE_TEXT)
795
          diskie.Finalize(error=str(err))
796

    
797
      if not compat.any(diskie.active for diskie in self._queue):
798
        break
799

    
800
      # Wait a bit
801
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
802
      logging.debug("Waiting for %ss", delay)
803
      time.sleep(delay)
804

    
805
  def FinalizeAll(self):
806
    """Finalizes all pending transfers.
807

808
    """
809
    success = True
810

    
811
    for diskie in self._queue:
812
      success = diskie.Finalize() and success
813

    
814
    return success
815

    
816

    
817
class _TransferInstCbBase(ImportExportCbBase):
818
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
819
               dest_node, dest_ip):
820
    """Initializes this class.
821

822
    """
823
    ImportExportCbBase.__init__(self)
824

    
825
    self.lu = lu
826
    self.feedback_fn = feedback_fn
827
    self.instance = instance
828
    self.timeouts = timeouts
829
    self.src_node = src_node
830
    self.src_cbs = src_cbs
831
    self.dest_node = dest_node
832
    self.dest_ip = dest_ip
833

    
834

    
835
class _TransferInstSourceCb(_TransferInstCbBase):
836
  def ReportConnected(self, ie, dtp):
837
    """Called when a connection has been established.
838

839
    """
840
    assert self.src_cbs is None
841
    assert dtp.src_export == ie
842
    assert dtp.dest_import
843

    
844
    self.feedback_fn("%s is sending data on %s" %
845
                     (dtp.data.name, ie.node_name))
846

    
847
  def ReportProgress(self, ie, dtp):
848
    """Called when new progress information should be reported.
849

850
    """
851
    progress = ie.progress
852
    if not progress:
853
      return
854

    
855
    self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
856

    
857
  def ReportFinished(self, ie, dtp):
858
    """Called when a transfer has finished.
859

860
    """
861
    assert self.src_cbs is None
862
    assert dtp.src_export == ie
863
    assert dtp.dest_import
864

    
865
    if ie.success:
866
      self.feedback_fn("%s finished sending data" % dtp.data.name)
867
    else:
868
      self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
869
                       (dtp.data.name, ie.final_message, ie.recent_output))
870

    
871
    dtp.RecordResult(ie.success)
872

    
873
    cb = dtp.data.finished_fn
874
    if cb:
875
      cb()
876

    
877
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
878
    # give the daemon a moment to sort things out
879
    if dtp.dest_import and not ie.success:
880
      dtp.dest_import.Abort()
881

    
882

    
883
class _TransferInstDestCb(_TransferInstCbBase):
884
  def ReportListening(self, ie, dtp):
885
    """Called when daemon started listening.
886

887
    """
888
    assert self.src_cbs
889
    assert dtp.src_export is None
890
    assert dtp.dest_import
891
    assert dtp.export_opts
892

    
893
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
894

    
895
    # Start export on source node
896
    de = DiskExport(self.lu, self.src_node, dtp.export_opts,
897
                    self.dest_ip, ie.listen_port,
898
                    self.instance, dtp.data.src_io, dtp.data.src_ioargs,
899
                    self.timeouts, self.src_cbs, private=dtp)
900
    ie.loop.Add(de)
901

    
902
    dtp.src_export = de
903

    
904
  def ReportConnected(self, ie, dtp):
905
    """Called when a connection has been established.
906

907
    """
908
    self.feedback_fn("%s is receiving data on %s" %
909
                     (dtp.data.name, self.dest_node))
910

    
911
  def ReportFinished(self, ie, dtp):
912
    """Called when a transfer has finished.
913

914
    """
915
    if ie.success:
916
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
917
    else:
918
      self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
919
                       (dtp.data.name, ie.final_message, ie.recent_output))
920

    
921
    dtp.RecordResult(ie.success)
922

    
923
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
924
    # give the daemon a moment to sort things out
925
    if dtp.src_export and not ie.success:
926
      dtp.src_export.Abort()
927

    
928

    
929
class DiskTransfer(object):
930
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
931
               finished_fn):
932
    """Initializes this class.
933

934
    @type name: string
935
    @param name: User-visible name for this transfer (e.g. "disk/0")
936
    @param src_io: Source I/O type
937
    @param src_ioargs: Source I/O arguments
938
    @param dest_io: Destination I/O type
939
    @param dest_ioargs: Destination I/O arguments
940
    @type finished_fn: callable
941
    @param finished_fn: Function called once transfer has finished
942

943
    """
944
    self.name = name
945

    
946
    self.src_io = src_io
947
    self.src_ioargs = src_ioargs
948

    
949
    self.dest_io = dest_io
950
    self.dest_ioargs = dest_ioargs
951

    
952
    self.finished_fn = finished_fn
953

    
954

    
955
class _DiskTransferPrivate(object):
956
  def __init__(self, data, success, export_opts):
957
    """Initializes this class.
958

959
    @type data: L{DiskTransfer}
960
    @type success: bool
961

962
    """
963
    self.data = data
964
    self.success = success
965
    self.export_opts = export_opts
966

    
967
    self.src_export = None
968
    self.dest_import = None
969

    
970
  def RecordResult(self, success):
971
    """Updates the status.
972

973
    One failed part will cause the whole transfer to fail.
974

975
    """
976
    self.success = self.success and success
977

    
978

    
979
def _GetInstDiskMagic(base, instance_name, index):
980
  """Computes the magic value for a disk export or import.
981

982
  @type base: string
983
  @param base: Random seed value (can be the same for all disks of a transfer)
984
  @type instance_name: string
985
  @param instance_name: Name of instance
986
  @type index: number
987
  @param index: Disk index
988

989
  """
990
  h = compat.sha1_hash()
991
  h.update(str(constants.RIE_VERSION))
992
  h.update(base)
993
  h.update(instance_name)
994
  h.update(str(index))
995
  return h.hexdigest()
996

    
997

    
998
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
999
                         instance, all_transfers):
1000
  """Transfers an instance's data from one node to another.
1001

1002
  @param lu: Logical unit instance
1003
  @param feedback_fn: Feedback function
1004
  @type src_node: string
1005
  @param src_node: Source node name
1006
  @type dest_node: string
1007
  @param dest_node: Destination node name
1008
  @type dest_ip: string
1009
  @param dest_ip: IP address of destination node
1010
  @type instance: L{objects.Instance}
1011
  @param instance: Instance object
1012
  @type all_transfers: list of L{DiskTransfer} instances
1013
  @param all_transfers: List of all disk transfers to be made
1014
  @rtype: list
1015
  @return: List with a boolean (True=successful, False=failed) for success for
1016
           each transfer
1017

1018
  """
1019
  # Disable compression for all moves as these are all within the same cluster
1020
  compress = constants.IEC_NONE
1021

    
1022
  logging.debug("Source node %s, destination node %s, compression '%s'",
1023
                src_node, dest_node, compress)
1024

    
1025
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1026
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1027
                                  src_node, None, dest_node, dest_ip)
1028
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1029
                                 src_node, src_cbs, dest_node, dest_ip)
1030

    
1031
  all_dtp = []
1032

    
1033
  base_magic = utils.GenerateSecret(6)
1034

    
1035
  ieloop = ImportExportLoop(lu)
1036
  try:
1037
    for idx, transfer in enumerate(all_transfers):
1038
      if transfer:
1039
        feedback_fn("Exporting %s from %s to %s" %
1040
                    (transfer.name, src_node, dest_node))
1041

    
1042
        magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1043
        opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1044
                                           compress=compress, magic=magic)
1045

    
1046
        dtp = _DiskTransferPrivate(transfer, True, opts)
1047

    
1048
        di = DiskImport(lu, dest_node, opts, instance,
1049
                        transfer.dest_io, transfer.dest_ioargs,
1050
                        timeouts, dest_cbs, private=dtp)
1051
        ieloop.Add(di)
1052

    
1053
        dtp.dest_import = di
1054
      else:
1055
        dtp = _DiskTransferPrivate(None, False, None)
1056

    
1057
      all_dtp.append(dtp)
1058

    
1059
    ieloop.Run()
1060
  finally:
1061
    ieloop.FinalizeAll()
1062

    
1063
  assert len(all_dtp) == len(all_transfers)
1064
  assert compat.all((dtp.src_export is None or
1065
                      dtp.src_export.success is not None) and
1066
                     (dtp.dest_import is None or
1067
                      dtp.dest_import.success is not None)
1068
                     for dtp in all_dtp), \
1069
         "Not all imports/exports are finalized"
1070

    
1071
  return [bool(dtp.success) for dtp in all_dtp]
1072

    
1073

    
1074
class _RemoteExportCb(ImportExportCbBase):
1075
  def __init__(self, feedback_fn, disk_count):
1076
    """Initializes this class.
1077

1078
    """
1079
    ImportExportCbBase.__init__(self)
1080
    self._feedback_fn = feedback_fn
1081
    self._dresults = [None] * disk_count
1082

    
1083
  @property
1084
  def disk_results(self):
1085
    """Returns per-disk results.
1086

1087
    """
1088
    return self._dresults
1089

    
1090
  def ReportConnected(self, ie, private):
1091
    """Called when a connection has been established.
1092

1093
    """
1094
    (idx, _) = private
1095

    
1096
    self._feedback_fn("Disk %s is now sending data" % idx)
1097

    
1098
  def ReportProgress(self, ie, private):
1099
    """Called when new progress information should be reported.
1100

1101
    """
1102
    (idx, _) = private
1103

    
1104
    progress = ie.progress
1105
    if not progress:
1106
      return
1107

    
1108
    self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1109

    
1110
  def ReportFinished(self, ie, private):
1111
    """Called when a transfer has finished.
1112

1113
    """
1114
    (idx, finished_fn) = private
1115

    
1116
    if ie.success:
1117
      self._feedback_fn("Disk %s finished sending data" % idx)
1118
    else:
1119
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" %
1120
                        (idx, ie.final_message, ie.recent_output))
1121

    
1122
    self._dresults[idx] = bool(ie.success)
1123

    
1124
    if finished_fn:
1125
      finished_fn()
1126

    
1127

    
1128
class ExportInstanceHelper:
1129
  def __init__(self, lu, feedback_fn, instance):
1130
    """Initializes this class.
1131

1132
    @param lu: Logical unit instance
1133
    @param feedback_fn: Feedback function
1134
    @type instance: L{objects.Instance}
1135
    @param instance: Instance object
1136

1137
    """
1138
    self._lu = lu
1139
    self._feedback_fn = feedback_fn
1140
    self._instance = instance
1141

    
1142
    self._snap_disks = []
1143
    self._removed_snaps = [False] * len(instance.disks)
1144

    
1145
  def CreateSnapshots(self):
1146
    """Creates an LVM snapshot for every disk of the instance.
1147

1148
    """
1149
    assert not self._snap_disks
1150

    
1151
    instance = self._instance
1152
    src_node = instance.primary_node
1153

    
1154
    for idx, disk in enumerate(instance.disks):
1155
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1156
                        (idx, src_node))
1157

    
1158
      # result.payload will be a snapshot of an lvm leaf of the one we
1159
      # passed
1160
      result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1161
      new_dev = False
1162
      msg = result.fail_msg
1163
      if msg:
1164
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1165
                            idx, src_node, msg)
1166
      elif (not isinstance(result.payload, (tuple, list)) or
1167
            len(result.payload) != 2):
1168
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1169
                            " result '%s'", idx, src_node, result.payload)
1170
      else:
1171
        disk_id = tuple(result.payload)
1172
        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1173
                               logical_id=disk_id, physical_id=disk_id,
1174
                               iv_name=disk.iv_name)
1175

    
1176
      self._snap_disks.append(new_dev)
1177

    
1178
    assert len(self._snap_disks) == len(instance.disks)
1179
    assert len(self._removed_snaps) == len(instance.disks)
1180

    
1181
  def _RemoveSnapshot(self, disk_index):
1182
    """Removes an LVM snapshot.
1183

1184
    @type disk_index: number
1185
    @param disk_index: Index of the snapshot to be removed
1186

1187
    """
1188
    disk = self._snap_disks[disk_index]
1189
    if disk and not self._removed_snaps[disk_index]:
1190
      src_node = self._instance.primary_node
1191

    
1192
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1193
                        (disk_index, src_node))
1194

    
1195
      result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1196
      if result.fail_msg:
1197
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1198
                            " %s: %s", disk_index, src_node, result.fail_msg)
1199
      else:
1200
        self._removed_snaps[disk_index] = True
1201

    
1202
  def LocalExport(self, dest_node):
1203
    """Intra-cluster instance export.
1204

1205
    @type dest_node: L{objects.Node}
1206
    @param dest_node: Destination node
1207

1208
    """
1209
    instance = self._instance
1210
    src_node = instance.primary_node
1211

    
1212
    assert len(self._snap_disks) == len(instance.disks)
1213

    
1214
    transfers = []
1215

    
1216
    for idx, dev in enumerate(self._snap_disks):
1217
      if not dev:
1218
        transfers.append(None)
1219
        continue
1220

    
1221
      path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1222
                            dev.physical_id[1])
1223

    
1224
      finished_fn = compat.partial(self._TransferFinished, idx)
1225

    
1226
      # FIXME: pass debug option from opcode to backend
1227
      dt = DiskTransfer("snapshot/%s" % idx,
1228
                        constants.IEIO_SCRIPT, (dev, idx),
1229
                        constants.IEIO_FILE, (path, ),
1230
                        finished_fn)
1231
      transfers.append(dt)
1232

    
1233
    # Actually export data
1234
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1235
                                    src_node, dest_node.name,
1236
                                    dest_node.secondary_ip,
1237
                                    instance, transfers)
1238

    
1239
    assert len(dresults) == len(instance.disks)
1240

    
1241
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1242
    result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1243
                                               self._snap_disks)
1244
    msg = result.fail_msg
1245
    fin_resu = not msg
1246
    if msg:
1247
      self._lu.LogWarning("Could not finalize export for instance %s"
1248
                          " on node %s: %s", instance.name, dest_node.name, msg)
1249

    
1250
    return (fin_resu, dresults)
1251

    
1252
  def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1253
    """Inter-cluster instance export.
1254

1255
    @type disk_info: list
1256
    @param disk_info: Per-disk destination information
1257
    @type key_name: string
1258
    @param key_name: Name of X509 key to use
1259
    @type dest_ca_pem: string
1260
    @param dest_ca_pem: Destination X509 CA in PEM format
1261
    @type timeouts: L{ImportExportTimeouts}
1262
    @param timeouts: Timeouts for this import
1263

1264
    """
1265
    instance = self._instance
1266

    
1267
    assert len(disk_info) == len(instance.disks)
1268

    
1269
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1270

    
1271
    ieloop = ImportExportLoop(self._lu)
1272
    try:
1273
      for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1274
                                                           disk_info)):
1275
        # Decide whether to use IPv6
1276
        ipv6 = netutils.IP6Address.IsValid(host)
1277

    
1278
        opts = objects.ImportExportOptions(key_name=key_name,
1279
                                           ca_pem=dest_ca_pem,
1280
                                           magic=magic, ipv6=ipv6)
1281

    
1282
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1283
        finished_fn = compat.partial(self._TransferFinished, idx)
1284
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1285
                              opts, host, port, instance,
1286
                              constants.IEIO_SCRIPT, (dev, idx),
1287
                              timeouts, cbs, private=(idx, finished_fn)))
1288

    
1289
      ieloop.Run()
1290
    finally:
1291
      ieloop.FinalizeAll()
1292

    
1293
    return (True, cbs.disk_results)
1294

    
1295
  def _TransferFinished(self, idx):
1296
    """Called once a transfer has finished.
1297

1298
    @type idx: number
1299
    @param idx: Disk index
1300

1301
    """
1302
    logging.debug("Transfer %s finished", idx)
1303
    self._RemoveSnapshot(idx)
1304

    
1305
  def Cleanup(self):
1306
    """Remove all snapshots.
1307

1308
    """
1309
    assert len(self._removed_snaps) == len(self._instance.disks)
1310
    for idx in range(len(self._instance.disks)):
1311
      self._RemoveSnapshot(idx)
1312

    
1313

    
1314
class _RemoteImportCb(ImportExportCbBase):
1315
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1316
               external_address):
1317
    """Initializes this class.
1318

1319
    @type cds: string
1320
    @param cds: Cluster domain secret
1321
    @type x509_cert_pem: string
1322
    @param x509_cert_pem: CA used for signing import key
1323
    @type disk_count: number
1324
    @param disk_count: Number of disks
1325
    @type external_address: string
1326
    @param external_address: External address of destination node
1327

1328
    """
1329
    ImportExportCbBase.__init__(self)
1330
    self._feedback_fn = feedback_fn
1331
    self._cds = cds
1332
    self._x509_cert_pem = x509_cert_pem
1333
    self._disk_count = disk_count
1334
    self._external_address = external_address
1335

    
1336
    self._dresults = [None] * disk_count
1337
    self._daemon_port = [None] * disk_count
1338

    
1339
    self._salt = utils.GenerateSecret(8)
1340

    
1341
  @property
1342
  def disk_results(self):
1343
    """Returns per-disk results.
1344

1345
    """
1346
    return self._dresults
1347

    
1348
  def _CheckAllListening(self):
1349
    """Checks whether all daemons are listening.
1350

1351
    If all daemons are listening, the information is sent to the client.
1352

1353
    """
1354
    if not compat.all(dp is not None for dp in self._daemon_port):
1355
      return
1356

    
1357
    host = self._external_address
1358

    
1359
    disks = []
1360
    for idx, (port, magic) in enumerate(self._daemon_port):
1361
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1362
                                               idx, host, port, magic))
1363

    
1364
    assert len(disks) == self._disk_count
1365

    
1366
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1367
      "disks": disks,
1368
      "x509_ca": self._x509_cert_pem,
1369
      })
1370

    
1371
  def ReportListening(self, ie, private):
1372
    """Called when daemon started listening.
1373

1374
    """
1375
    (idx, ) = private
1376

    
1377
    self._feedback_fn("Disk %s is now listening" % idx)
1378

    
1379
    assert self._daemon_port[idx] is None
1380

    
1381
    self._daemon_port[idx] = (ie.listen_port, ie.magic)
1382

    
1383
    self._CheckAllListening()
1384

    
1385
  def ReportConnected(self, ie, private):
1386
    """Called when a connection has been established.
1387

1388
    """
1389
    (idx, ) = private
1390

    
1391
    self._feedback_fn("Disk %s is now receiving data" % idx)
1392

    
1393
  def ReportFinished(self, ie, private):
1394
    """Called when a transfer has finished.
1395

1396
    """
1397
    (idx, ) = private
1398

    
1399
    # Daemon is certainly no longer listening
1400
    self._daemon_port[idx] = None
1401

    
1402
    if ie.success:
1403
      self._feedback_fn("Disk %s finished receiving data" % idx)
1404
    else:
1405
      self._feedback_fn(("Disk %s failed to receive data: %s"
1406
                         " (recent output: %r)") %
1407
                        (idx, ie.final_message, ie.recent_output))
1408

    
1409
    self._dresults[idx] = bool(ie.success)
1410

    
1411

    
1412
def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1413
                 cds, timeouts):
1414
  """Imports an instance from another cluster.
1415

1416
  @param lu: Logical unit instance
1417
  @param feedback_fn: Feedback function
1418
  @type instance: L{objects.Instance}
1419
  @param instance: Instance object
1420
  @type pnode: L{objects.Node}
1421
  @param pnode: Primary node of instance as an object
1422
  @type source_x509_ca: OpenSSL.crypto.X509
1423
  @param source_x509_ca: Import source's X509 CA
1424
  @type cds: string
1425
  @param cds: Cluster domain secret
1426
  @type timeouts: L{ImportExportTimeouts}
1427
  @param timeouts: Timeouts for this import
1428

1429
  """
1430
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1431
                                                  source_x509_ca)
1432

    
1433
  magic_base = utils.GenerateSecret(6)
1434

    
1435
  # Decide whether to use IPv6
1436
  ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1437

    
1438
  # Create crypto key
1439
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1440
                                        constants.RIE_CERT_VALIDITY)
1441
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1442

    
1443
  (x509_key_name, x509_cert_pem) = result.payload
1444
  try:
1445
    # Load certificate
1446
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1447
                                                x509_cert_pem)
1448

    
1449
    # Sign certificate
1450
    signed_x509_cert_pem = \
1451
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1452

    
1453
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1454
                          len(instance.disks), pnode.primary_ip)
1455

    
1456
    ieloop = ImportExportLoop(lu)
1457
    try:
1458
      for idx, dev in enumerate(instance.disks):
1459
        magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1460

    
1461
        # Import daemon options
1462
        opts = objects.ImportExportOptions(key_name=x509_key_name,
1463
                                           ca_pem=source_ca_pem,
1464
                                           magic=magic, ipv6=ipv6)
1465

    
1466
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1467
                              constants.IEIO_SCRIPT, (dev, idx),
1468
                              timeouts, cbs, private=(idx, )))
1469

    
1470
      ieloop.Run()
1471
    finally:
1472
      ieloop.FinalizeAll()
1473
  finally:
1474
    # Remove crypto key and certificate
1475
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1476
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1477

    
1478
  return cbs.disk_results
1479

    
1480

    
1481
def _GetImportExportHandshakeMessage(version):
1482
  """Returns the handshake message for a RIE protocol version.
1483

1484
  @type version: number
1485

1486
  """
1487
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1488

    
1489

    
1490
def ComputeRemoteExportHandshake(cds):
1491
  """Computes the remote import/export handshake.
1492

1493
  @type cds: string
1494
  @param cds: Cluster domain secret
1495

1496
  """
1497
  salt = utils.GenerateSecret(8)
1498
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1499
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1500

    
1501

    
1502
def CheckRemoteExportHandshake(cds, handshake):
1503
  """Checks the handshake of a remote import/export.
1504

1505
  @type cds: string
1506
  @param cds: Cluster domain secret
1507
  @type handshake: sequence
1508
  @param handshake: Handshake sent by remote peer
1509

1510
  """
1511
  try:
1512
    (version, hmac_digest, hmac_salt) = handshake
1513
  except (TypeError, ValueError), err:
1514
    return "Invalid data: %s" % err
1515

    
1516
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1517
                              hmac_digest, salt=hmac_salt):
1518
    return "Hash didn't match, clusters don't share the same domain secret"
1519

    
1520
  if version != constants.RIE_VERSION:
1521
    return ("Clusters don't have the same remote import/export protocol"
1522
            " (local=%s, remote=%s)" %
1523
            (constants.RIE_VERSION, version))
1524

    
1525
  return None
1526

    
1527

    
1528
def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1529
  """Returns the hashed text for import/export disk information.
1530

1531
  @type disk_index: number
1532
  @param disk_index: Index of disk (included in hash)
1533
  @type host: string
1534
  @param host: Hostname
1535
  @type port: number
1536
  @param port: Daemon port
1537
  @type magic: string
1538
  @param magic: Magic value
1539

1540
  """
1541
  return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1542

    
1543

    
1544
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1545
  """Verifies received disk information for an export.
1546

1547
  @type cds: string
1548
  @param cds: Cluster domain secret
1549
  @type disk_index: number
1550
  @param disk_index: Index of disk (included in hash)
1551
  @type disk_info: sequence
1552
  @param disk_info: Disk information sent by remote peer
1553

1554
  """
1555
  try:
1556
    (host, port, magic, hmac_digest, hmac_salt) = disk_info
1557
  except (TypeError, ValueError), err:
1558
    raise errors.GenericError("Invalid data: %s" % err)
1559

    
1560
  if not (host and port and magic):
1561
    raise errors.GenericError("Missing destination host, port or magic")
1562

    
1563
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1564

    
1565
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1566
    raise errors.GenericError("HMAC is wrong")
1567

    
1568
  if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1569
    destination = host
1570
  else:
1571
    destination = netutils.Hostname.GetNormalizedName(host)
1572

    
1573
  return (destination,
1574
          utils.ValidateServiceName(port),
1575
          magic)
1576

    
1577

    
1578
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1579
  """Computes the signed disk information for a remote import.
1580

1581
  @type cds: string
1582
  @param cds: Cluster domain secret
1583
  @type salt: string
1584
  @param salt: HMAC salt
1585
  @type disk_index: number
1586
  @param disk_index: Index of disk (included in hash)
1587
  @type host: string
1588
  @param host: Hostname
1589
  @type port: number
1590
  @param port: Daemon port
1591
  @type magic: string
1592
  @param magic: Magic value
1593

1594
  """
1595
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1596
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1597
  return (host, port, magic, hmac_digest, salt)