Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ b705c7a6

History | View | Annotate | Download (43.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Instance-related functions and classes for masterd.
23

24
"""
25

    
26
import logging
27
import time
28
import OpenSSL
29

    
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import compat
33
from ganeti import utils
34
from ganeti import objects
35
from ganeti import netutils
36

    
37

    
38
class _ImportExportError(Exception):
39
  """Local exception to report import/export errors.
40

41
  """
42

    
43

    
44
class ImportExportTimeouts(object):
45
  #: Time until daemon starts writing status file
46
  DEFAULT_READY_TIMEOUT = 10
47

    
48
  #: Length of time until errors cause hard failure
49
  DEFAULT_ERROR_TIMEOUT = 10
50

    
51
  #: Time after which daemon must be listening
52
  DEFAULT_LISTEN_TIMEOUT = 10
53

    
54
  #: Progress update interval
55
  DEFAULT_PROGRESS_INTERVAL = 60
56

    
57
  __slots__ = [
58
    "error",
59
    "ready",
60
    "listen",
61
    "connect",
62
    "progress",
63
    ]
64

    
65
  def __init__(self, connect,
66
               listen=DEFAULT_LISTEN_TIMEOUT,
67
               error=DEFAULT_ERROR_TIMEOUT,
68
               ready=DEFAULT_READY_TIMEOUT,
69
               progress=DEFAULT_PROGRESS_INTERVAL):
70
    """Initializes this class.
71

72
    @type connect: number
73
    @param connect: Timeout for establishing connection
74
    @type listen: number
75
    @param listen: Timeout for starting to listen for connections
76
    @type error: number
77
    @param error: Length of time until errors cause hard failure
78
    @type ready: number
79
    @param ready: Timeout for daemon to become ready
80
    @type progress: number
81
    @param progress: Progress update interval
82

83
    """
84
    self.error = error
85
    self.ready = ready
86
    self.listen = listen
87
    self.connect = connect
88
    self.progress = progress
89

    
90

    
91
class ImportExportCbBase(object):
92
  """Callbacks for disk import/export.
93

94
  """
95
  def ReportListening(self, ie, private):
96
    """Called when daemon started listening.
97

98
    @type ie: Subclass of L{_DiskImportExportBase}
99
    @param ie: Import/export object
100
    @param private: Private data passed to import/export object
101

102
    """
103

    
104
  def ReportConnected(self, ie, private):
105
    """Called when a connection has been established.
106

107
    @type ie: Subclass of L{_DiskImportExportBase}
108
    @param ie: Import/export object
109
    @param private: Private data passed to import/export object
110

111
    """
112

    
113
  def ReportProgress(self, ie, private):
114
    """Called when new progress information should be reported.
115

116
    @type ie: Subclass of L{_DiskImportExportBase}
117
    @param ie: Import/export object
118
    @param private: Private data passed to import/export object
119

120
    """
121

    
122
  def ReportFinished(self, ie, private):
123
    """Called when a transfer has finished.
124

125
    @type ie: Subclass of L{_DiskImportExportBase}
126
    @param ie: Import/export object
127
    @param private: Private data passed to import/export object
128

129
    """
130

    
131

    
132
def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
133
  """Checks whether a timeout has expired.
134

135
  """
136
  return _time_fn() > (epoch + timeout)
137

    
138

    
139
class _DiskImportExportBase(object):
140
  MODE_TEXT = None
141

    
142
  def __init__(self, lu, node_name, opts,
143
               instance, timeouts, cbs, private=None):
144
    """Initializes this class.
145

146
    @param lu: Logical unit instance
147
    @type node_name: string
148
    @param node_name: Node name for import
149
    @type opts: L{objects.ImportExportOptions}
150
    @param opts: Import/export daemon options
151
    @type instance: L{objects.Instance}
152
    @param instance: Instance object
153
    @type timeouts: L{ImportExportTimeouts}
154
    @param timeouts: Timeouts for this import
155
    @type cbs: L{ImportExportCbBase}
156
    @param cbs: Callbacks
157
    @param private: Private data for callback functions
158

159
    """
160
    assert self.MODE_TEXT
161

    
162
    self._lu = lu
163
    self.node_name = node_name
164
    self._opts = opts
165
    self._instance = instance
166
    self._timeouts = timeouts
167
    self._cbs = cbs
168
    self._private = private
169

    
170
    # Parent loop
171
    self._loop = None
172

    
173
    # Timestamps
174
    self._ts_begin = None
175
    self._ts_connected = None
176
    self._ts_finished = None
177
    self._ts_cleanup = None
178
    self._ts_last_progress = None
179
    self._ts_last_error = None
180

    
181
    # Transfer status
182
    self.success = None
183
    self.final_message = None
184

    
185
    # Daemon status
186
    self._daemon_name = None
187
    self._daemon = None
188

    
189
  @property
190
  def recent_output(self):
191
    """Returns the most recent output from the daemon.
192

193
    """
194
    if self._daemon:
195
      return self._daemon.recent_output
196

    
197
    return None
198

    
199
  @property
200
  def progress(self):
201
    """Returns transfer progress information.
202

203
    """
204
    if not self._daemon:
205
      return None
206

    
207
    return (self._daemon.progress_mbytes,
208
            self._daemon.progress_throughput,
209
            self._daemon.progress_percent,
210
            self._daemon.progress_eta)
211

    
212
  @property
213
  def magic(self):
214
    """Returns the magic value for this import/export.
215

216
    """
217
    return self._opts.magic
218

    
219
  @property
220
  def active(self):
221
    """Determines whether this transport is still active.
222

223
    """
224
    return self.success is None
225

    
226
  @property
227
  def loop(self):
228
    """Returns parent loop.
229

230
    @rtype: L{ImportExportLoop}
231

232
    """
233
    return self._loop
234

    
235
  def SetLoop(self, loop):
236
    """Sets the parent loop.
237

238
    @type loop: L{ImportExportLoop}
239

240
    """
241
    if self._loop:
242
      raise errors.ProgrammerError("Loop can only be set once")
243

    
244
    self._loop = loop
245

    
246
  def _StartDaemon(self):
247
    """Starts the import/export daemon.
248

249
    """
250
    raise NotImplementedError()
251

    
252
  def CheckDaemon(self):
253
    """Checks whether daemon has been started and if not, starts it.
254

255
    @rtype: string
256
    @return: Daemon name
257

258
    """
259
    assert self._ts_cleanup is None
260

    
261
    if self._daemon_name is None:
262
      assert self._ts_begin is None
263

    
264
      result = self._StartDaemon()
265
      if result.fail_msg:
266
        raise _ImportExportError("Failed to start %s on %s: %s" %
267
                                 (self.MODE_TEXT, self.node_name,
268
                                  result.fail_msg))
269

    
270
      daemon_name = result.payload
271

    
272
      logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
273
                   self.node_name)
274

    
275
      self._ts_begin = time.time()
276
      self._daemon_name = daemon_name
277

    
278
    return self._daemon_name
279

    
280
  def GetDaemonName(self):
281
    """Returns the daemon name.
282

283
    """
284
    assert self._daemon_name, "Daemon has not been started"
285
    assert self._ts_cleanup is None
286
    return self._daemon_name
287

    
288
  def Abort(self):
289
    """Sends SIGTERM to import/export daemon (if still active).
290

291
    """
292
    if self._daemon_name:
293
      self._lu.LogWarning("Aborting %s %r on %s",
294
                          self.MODE_TEXT, self._daemon_name, self.node_name)
295
      result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
296
      if result.fail_msg:
297
        self._lu.LogWarning("Failed to abort %s %r on %s: %s",
298
                            self.MODE_TEXT, self._daemon_name,
299
                            self.node_name, result.fail_msg)
300
        return False
301

    
302
    return True
303

    
304
  def _SetDaemonData(self, data):
305
    """Internal function for updating status daemon data.
306

307
    @type data: L{objects.ImportExportStatus}
308
    @param data: Daemon status data
309

310
    """
311
    assert self._ts_begin is not None
312

    
313
    if not data:
314
      if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
315
        raise _ImportExportError("Didn't become ready after %s seconds" %
316
                                 self._timeouts.ready)
317

    
318
      return False
319

    
320
    self._daemon = data
321

    
322
    return True
323

    
324
  def SetDaemonData(self, success, data):
325
    """Updates daemon status data.
326

327
    @type success: bool
328
    @param success: Whether fetching data was successful or not
329
    @type data: L{objects.ImportExportStatus}
330
    @param data: Daemon status data
331

332
    """
333
    if not success:
334
      if self._ts_last_error is None:
335
        self._ts_last_error = time.time()
336

    
337
      elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
338
        raise _ImportExportError("Too many errors while updating data")
339

    
340
      return False
341

    
342
    self._ts_last_error = None
343

    
344
    return self._SetDaemonData(data)
345

    
346
  def CheckListening(self):
347
    """Checks whether the daemon is listening.
348

349
    """
350
    raise NotImplementedError()
351

    
352
  def _GetConnectedCheckEpoch(self):
353
    """Returns timeout to calculate connect timeout.
354

355
    """
356
    raise NotImplementedError()
357

    
358
  def CheckConnected(self):
359
    """Checks whether the daemon is connected.
360

361
    @rtype: bool
362
    @return: Whether the daemon is connected
363

364
    """
365
    assert self._daemon, "Daemon status missing"
366

    
367
    if self._ts_connected is not None:
368
      return True
369

    
370
    if self._daemon.connected:
371
      self._ts_connected = time.time()
372

    
373
      # TODO: Log remote peer
374
      logging.debug("%s %r on %s is now connected",
375
                    self.MODE_TEXT, self._daemon_name, self.node_name)
376

    
377
      self._cbs.ReportConnected(self, self._private)
378

    
379
      return True
380

    
381
    if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
382
      raise _ImportExportError("Not connected after %s seconds" %
383
                               self._timeouts.connect)
384

    
385
    return False
386

    
387
  def _CheckProgress(self):
388
    """Checks whether a progress update should be reported.
389

390
    """
391
    if ((self._ts_last_progress is None or
392
         _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
393
        self._daemon and
394
        self._daemon.progress_mbytes is not None and
395
        self._daemon.progress_throughput is not None):
396
      self._cbs.ReportProgress(self, self._private)
397
      self._ts_last_progress = time.time()
398

    
399
  def CheckFinished(self):
400
    """Checks whether the daemon exited.
401

402
    @rtype: bool
403
    @return: Whether the transfer is finished
404

405
    """
406
    assert self._daemon, "Daemon status missing"
407

    
408
    if self._ts_finished:
409
      return True
410

    
411
    if self._daemon.exit_status is None:
412
      # TODO: Adjust delay for ETA expiring soon
413
      self._CheckProgress()
414
      return False
415

    
416
    self._ts_finished = time.time()
417

    
418
    self._ReportFinished(self._daemon.exit_status == 0,
419
                         self._daemon.error_message)
420

    
421
    return True
422

    
423
  def _ReportFinished(self, success, message):
424
    """Transfer is finished or daemon exited.
425

426
    @type success: bool
427
    @param success: Whether the transfer was successful
428
    @type message: string
429
    @param message: Error message
430

431
    """
432
    assert self.success is None
433

    
434
    self.success = success
435
    self.final_message = message
436

    
437
    if success:
438
      logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
439
                   self.node_name)
440
    elif self._daemon_name:
441
      self._lu.LogWarning("%s %r on %s failed: %s",
442
                          self.MODE_TEXT, self._daemon_name, self.node_name,
443
                          message)
444
    else:
445
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
446
                          self.node_name, message)
447

    
448
    self._cbs.ReportFinished(self, self._private)
449

    
450
  def _Finalize(self):
451
    """Makes the RPC call to finalize this import/export.
452

453
    """
454
    return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
455

    
456
  def Finalize(self, error=None):
457
    """Finalizes this import/export.
458

459
    """
460
    if self._daemon_name:
461
      logging.info("Finalizing %s %r on %s",
462
                   self.MODE_TEXT, self._daemon_name, self.node_name)
463

    
464
      result = self._Finalize()
465
      if result.fail_msg:
466
        self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
467
                            self.MODE_TEXT, self._daemon_name,
468
                            self.node_name, result.fail_msg)
469
        return False
470

    
471
      # Daemon is no longer running
472
      self._daemon_name = None
473
      self._ts_cleanup = time.time()
474

    
475
    if error:
476
      self._ReportFinished(False, error)
477

    
478
    return True
479

    
480

    
481
class DiskImport(_DiskImportExportBase):
482
  MODE_TEXT = "import"
483

    
484
  def __init__(self, lu, node_name, opts, instance,
485
               dest, dest_args, timeouts, cbs, private=None):
486
    """Initializes this class.
487

488
    @param lu: Logical unit instance
489
    @type node_name: string
490
    @param node_name: Node name for import
491
    @type opts: L{objects.ImportExportOptions}
492
    @param opts: Import/export daemon options
493
    @type instance: L{objects.Instance}
494
    @param instance: Instance object
495
    @param dest: I/O destination
496
    @param dest_args: I/O arguments
497
    @type timeouts: L{ImportExportTimeouts}
498
    @param timeouts: Timeouts for this import
499
    @type cbs: L{ImportExportCbBase}
500
    @param cbs: Callbacks
501
    @param private: Private data for callback functions
502

503
    """
504
    _DiskImportExportBase.__init__(self, lu, node_name, opts,
505
                                   instance, timeouts, cbs, private)
506
    self._dest = dest
507
    self._dest_args = dest_args
508

    
509
    # Timestamps
510
    self._ts_listening = None
511

    
512
  @property
513
  def listen_port(self):
514
    """Returns the port the daemon is listening on.
515

516
    """
517
    if self._daemon:
518
      return self._daemon.listen_port
519

    
520
    return None
521

    
522
  def _StartDaemon(self):
523
    """Starts the import daemon.
524

525
    """
526
    return self._lu.rpc.call_import_start(self.node_name, self._opts,
527
                                          self._instance,
528
                                          self._dest, self._dest_args)
529

    
530
  def CheckListening(self):
531
    """Checks whether the daemon is listening.
532

533
    @rtype: bool
534
    @return: Whether the daemon is listening
535

536
    """
537
    assert self._daemon, "Daemon status missing"
538

    
539
    if self._ts_listening is not None:
540
      return True
541

    
542
    port = self._daemon.listen_port
543
    if port is not None:
544
      self._ts_listening = time.time()
545

    
546
      logging.debug("Import %r on %s is now listening on port %s",
547
                    self._daemon_name, self.node_name, port)
548

    
549
      self._cbs.ReportListening(self, self._private)
550

    
551
      return True
552

    
553
    if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
554
      raise _ImportExportError("Not listening after %s seconds" %
555
                               self._timeouts.listen)
556

    
557
    return False
558

    
559
  def _GetConnectedCheckEpoch(self):
560
    """Returns the time since we started listening.
561

562
    """
563
    assert self._ts_listening is not None, \
564
           ("Checking whether an import is connected is only useful"
565
            " once it's been listening")
566

    
567
    return self._ts_listening
568

    
569

    
570
class DiskExport(_DiskImportExportBase):
571
  MODE_TEXT = "export"
572

    
573
  def __init__(self, lu, node_name, opts,
574
               dest_host, dest_port, instance, source, source_args,
575
               timeouts, cbs, private=None):
576
    """Initializes this class.
577

578
    @param lu: Logical unit instance
579
    @type node_name: string
580
    @param node_name: Node name for import
581
    @type opts: L{objects.ImportExportOptions}
582
    @param opts: Import/export daemon options
583
    @type dest_host: string
584
    @param dest_host: Destination host name or IP address
585
    @type dest_port: number
586
    @param dest_port: Destination port number
587
    @type instance: L{objects.Instance}
588
    @param instance: Instance object
589
    @param source: I/O source
590
    @param source_args: I/O source
591
    @type timeouts: L{ImportExportTimeouts}
592
    @param timeouts: Timeouts for this import
593
    @type cbs: L{ImportExportCbBase}
594
    @param cbs: Callbacks
595
    @param private: Private data for callback functions
596

597
    """
598
    _DiskImportExportBase.__init__(self, lu, node_name, opts,
599
                                   instance, timeouts, cbs, private)
600
    self._dest_host = dest_host
601
    self._dest_port = dest_port
602
    self._source = source
603
    self._source_args = source_args
604

    
605
  def _StartDaemon(self):
606
    """Starts the export daemon.
607

608
    """
609
    return self._lu.rpc.call_export_start(self.node_name, self._opts,
610
                                          self._dest_host, self._dest_port,
611
                                          self._instance, self._source,
612
                                          self._source_args)
613

    
614
  def CheckListening(self):
615
    """Checks whether the daemon is listening.
616

617
    """
618
    # Only an import can be listening
619
    return True
620

    
621
  def _GetConnectedCheckEpoch(self):
622
    """Returns the time since the daemon started.
623

624
    """
625
    assert self._ts_begin is not None
626

    
627
    return self._ts_begin
628

    
629

    
630
def FormatProgress(progress):
631
  """Formats progress information for user consumption
632

633
  """
634
  (mbytes, throughput, percent, eta) = progress
635

    
636
  parts = [
637
    utils.FormatUnit(mbytes, "h"),
638

    
639
    # Not using FormatUnit as it doesn't support kilobytes
640
    "%0.1f MiB/s" % throughput,
641
    ]
642

    
643
  if percent is not None:
644
    parts.append("%d%%" % percent)
645

    
646
  if eta is not None:
647
    parts.append("ETA %s" % utils.FormatSeconds(eta))
648

    
649
  return utils.CommaJoin(parts)
650

    
651

    
652
class ImportExportLoop:
653
  MIN_DELAY = 1.0
654
  MAX_DELAY = 20.0
655

    
656
  def __init__(self, lu):
657
    """Initializes this class.
658

659
    """
660
    self._lu = lu
661
    self._queue = []
662
    self._pending_add = []
663

    
664
  def Add(self, diskie):
665
    """Adds an import/export object to the loop.
666

667
    @type diskie: Subclass of L{_DiskImportExportBase}
668
    @param diskie: Import/export object
669

670
    """
671
    assert diskie not in self._pending_add
672
    assert diskie.loop is None
673

    
674
    diskie.SetLoop(self)
675

    
676
    # Adding new objects to a staging list is necessary, otherwise the main
677
    # loop gets confused if callbacks modify the queue while the main loop is
678
    # iterating over it.
679
    self._pending_add.append(diskie)
680

    
681
  @staticmethod
682
  def _CollectDaemonStatus(lu, daemons):
683
    """Collects the status for all import/export daemons.
684

685
    """
686
    daemon_status = {}
687

    
688
    for node_name, names in daemons.iteritems():
689
      result = lu.rpc.call_impexp_status(node_name, names)
690
      if result.fail_msg:
691
        lu.LogWarning("Failed to get daemon status on %s: %s",
692
                      node_name, result.fail_msg)
693
        continue
694

    
695
      assert len(names) == len(result.payload)
696

    
697
      daemon_status[node_name] = dict(zip(names, result.payload))
698

    
699
    return daemon_status
700

    
701
  @staticmethod
702
  def _GetActiveDaemonNames(queue):
703
    """Gets the names of all active daemons.
704

705
    """
706
    result = {}
707
    for diskie in queue:
708
      if not diskie.active:
709
        continue
710

    
711
      try:
712
        # Start daemon if necessary
713
        daemon_name = diskie.CheckDaemon()
714
      except _ImportExportError, err:
715
        logging.exception("%s failed", diskie.MODE_TEXT)
716
        diskie.Finalize(error=str(err))
717
        continue
718

    
719
      result.setdefault(diskie.node_name, []).append(daemon_name)
720

    
721
    assert len(queue) >= len(result)
722
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
723

    
724
    logging.debug("daemons=%r", result)
725

    
726
    return result
727

    
728
  def _AddPendingToQueue(self):
729
    """Adds all pending import/export objects to the internal queue.
730

731
    """
732
    assert compat.all(diskie not in self._queue and diskie.loop == self
733
                      for diskie in self._pending_add)
734

    
735
    self._queue.extend(self._pending_add)
736

    
737
    del self._pending_add[:]
738

    
739
  def Run(self):
740
    """Utility main loop.
741

742
    """
743
    while True:
744
      self._AddPendingToQueue()
745

    
746
      # Collect all active daemon names
747
      daemons = self._GetActiveDaemonNames(self._queue)
748
      if not daemons:
749
        break
750

    
751
      # Collection daemon status data
752
      data = self._CollectDaemonStatus(self._lu, daemons)
753

    
754
      # Use data
755
      delay = self.MAX_DELAY
756
      for diskie in self._queue:
757
        if not diskie.active:
758
          continue
759

    
760
        try:
761
          try:
762
            all_daemon_data = data[diskie.node_name]
763
          except KeyError:
764
            result = diskie.SetDaemonData(False, None)
765
          else:
766
            result = \
767
              diskie.SetDaemonData(True,
768
                                   all_daemon_data[diskie.GetDaemonName()])
769

    
770
          if not result:
771
            # Daemon not yet ready, retry soon
772
            delay = min(3.0, delay)
773
            continue
774

    
775
          if diskie.CheckFinished():
776
            # Transfer finished
777
            diskie.Finalize()
778
            continue
779

    
780
          # Normal case: check again in 5 seconds
781
          delay = min(5.0, delay)
782

    
783
          if not diskie.CheckListening():
784
            # Not yet listening, retry soon
785
            delay = min(1.0, delay)
786
            continue
787

    
788
          if not diskie.CheckConnected():
789
            # Not yet connected, retry soon
790
            delay = min(1.0, delay)
791
            continue
792

    
793
        except _ImportExportError, err:
794
          logging.exception("%s failed", diskie.MODE_TEXT)
795
          diskie.Finalize(error=str(err))
796

    
797
      if not compat.any(diskie.active for diskie in self._queue):
798
        break
799

    
800
      # Wait a bit
801
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
802
      logging.debug("Waiting for %ss", delay)
803
      time.sleep(delay)
804

    
805
  def FinalizeAll(self):
806
    """Finalizes all pending transfers.
807

808
    """
809
    success = True
810

    
811
    for diskie in self._queue:
812
      success = diskie.Finalize() and success
813

    
814
    return success
815

    
816

    
817
class _TransferInstCbBase(ImportExportCbBase):
818
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
819
               dest_node, dest_ip):
820
    """Initializes this class.
821

822
    """
823
    ImportExportCbBase.__init__(self)
824

    
825
    self.lu = lu
826
    self.feedback_fn = feedback_fn
827
    self.instance = instance
828
    self.timeouts = timeouts
829
    self.src_node = src_node
830
    self.src_cbs = src_cbs
831
    self.dest_node = dest_node
832
    self.dest_ip = dest_ip
833

    
834

    
835
class _TransferInstSourceCb(_TransferInstCbBase):
836
  def ReportConnected(self, ie, dtp):
837
    """Called when a connection has been established.
838

839
    """
840
    assert self.src_cbs is None
841
    assert dtp.src_export == ie
842
    assert dtp.dest_import
843

    
844
    self.feedback_fn("%s is sending data on %s" %
845
                     (dtp.data.name, ie.node_name))
846

    
847
  def ReportProgress(self, ie, dtp):
848
    """Called when new progress information should be reported.
849

850
    """
851
    progress = ie.progress
852
    if not progress:
853
      return
854

    
855
    self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
856

    
857
  def ReportFinished(self, ie, dtp):
858
    """Called when a transfer has finished.
859

860
    """
861
    assert self.src_cbs is None
862
    assert dtp.src_export == ie
863
    assert dtp.dest_import
864

    
865
    if ie.success:
866
      self.feedback_fn("%s finished sending data" % dtp.data.name)
867
    else:
868
      self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
869
                       (dtp.data.name, ie.final_message, ie.recent_output))
870

    
871
    dtp.RecordResult(ie.success)
872

    
873
    cb = dtp.data.finished_fn
874
    if cb:
875
      cb()
876

    
877
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
878
    # give the daemon a moment to sort things out
879
    if dtp.dest_import and not ie.success:
880
      dtp.dest_import.Abort()
881

    
882

    
883
class _TransferInstDestCb(_TransferInstCbBase):
884
  def ReportListening(self, ie, dtp):
885
    """Called when daemon started listening.
886

887
    """
888
    assert self.src_cbs
889
    assert dtp.src_export is None
890
    assert dtp.dest_import
891
    assert dtp.export_opts
892

    
893
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
894

    
895
    # Start export on source node
896
    de = DiskExport(self.lu, self.src_node, dtp.export_opts,
897
                    self.dest_ip, ie.listen_port,
898
                    self.instance, dtp.data.src_io, dtp.data.src_ioargs,
899
                    self.timeouts, self.src_cbs, private=dtp)
900
    ie.loop.Add(de)
901

    
902
    dtp.src_export = de
903

    
904
  def ReportConnected(self, ie, dtp):
905
    """Called when a connection has been established.
906

907
    """
908
    self.feedback_fn("%s is receiving data on %s" %
909
                     (dtp.data.name, self.dest_node))
910

    
911
  def ReportFinished(self, ie, dtp):
912
    """Called when a transfer has finished.
913

914
    """
915
    if ie.success:
916
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
917
    else:
918
      self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
919
                       (dtp.data.name, ie.final_message, ie.recent_output))
920

    
921
    dtp.RecordResult(ie.success)
922

    
923
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
924
    # give the daemon a moment to sort things out
925
    if dtp.src_export and not ie.success:
926
      dtp.src_export.Abort()
927

    
928

    
929
class DiskTransfer(object):
930
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
931
               finished_fn):
932
    """Initializes this class.
933

934
    @type name: string
935
    @param name: User-visible name for this transfer (e.g. "disk/0")
936
    @param src_io: Source I/O type
937
    @param src_ioargs: Source I/O arguments
938
    @param dest_io: Destination I/O type
939
    @param dest_ioargs: Destination I/O arguments
940
    @type finished_fn: callable
941
    @param finished_fn: Function called once transfer has finished
942

943
    """
944
    self.name = name
945

    
946
    self.src_io = src_io
947
    self.src_ioargs = src_ioargs
948

    
949
    self.dest_io = dest_io
950
    self.dest_ioargs = dest_ioargs
951

    
952
    self.finished_fn = finished_fn
953

    
954

    
955
class _DiskTransferPrivate(object):
956
  def __init__(self, data, success, export_opts):
957
    """Initializes this class.
958

959
    @type data: L{DiskTransfer}
960
    @type success: bool
961

962
    """
963
    self.data = data
964
    self.success = success
965
    self.export_opts = export_opts
966

    
967
    self.src_export = None
968
    self.dest_import = None
969

    
970
  def RecordResult(self, success):
971
    """Updates the status.
972

973
    One failed part will cause the whole transfer to fail.
974

975
    """
976
    self.success = self.success and success
977

    
978

    
979
def _GetInstDiskMagic(base, instance_name, index):
980
  """Computes the magic value for a disk export or import.
981

982
  @type base: string
983
  @param base: Random seed value (can be the same for all disks of a transfer)
984
  @type instance_name: string
985
  @param instance_name: Name of instance
986
  @type index: number
987
  @param index: Disk index
988

989
  """
990
  h = compat.sha1_hash()
991
  h.update(str(constants.RIE_VERSION))
992
  h.update(base)
993
  h.update(instance_name)
994
  h.update(str(index))
995
  return h.hexdigest()
996

    
997

    
998
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
999
                         instance, all_transfers):
1000
  """Transfers an instance's data from one node to another.
1001

1002
  @param lu: Logical unit instance
1003
  @param feedback_fn: Feedback function
1004
  @type src_node: string
1005
  @param src_node: Source node name
1006
  @type dest_node: string
1007
  @param dest_node: Destination node name
1008
  @type dest_ip: string
1009
  @param dest_ip: IP address of destination node
1010
  @type instance: L{objects.Instance}
1011
  @param instance: Instance object
1012
  @type all_transfers: list of L{DiskTransfer} instances
1013
  @param all_transfers: List of all disk transfers to be made
1014
  @rtype: list
1015
  @return: List with a boolean (True=successful, False=failed) for success for
1016
           each transfer
1017

1018
  """
1019
  # Disable compression for all moves as these are all within the same cluster
1020
  compress = constants.IEC_NONE
1021

    
1022
  logging.debug("Source node %s, destination node %s, compression '%s'",
1023
                src_node, dest_node, compress)
1024

    
1025
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1026
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1027
                                  src_node, None, dest_node, dest_ip)
1028
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1029
                                 src_node, src_cbs, dest_node, dest_ip)
1030

    
1031
  all_dtp = []
1032

    
1033
  base_magic = utils.GenerateSecret(6)
1034

    
1035
  ieloop = ImportExportLoop(lu)
1036
  try:
1037
    for idx, transfer in enumerate(all_transfers):
1038
      if transfer:
1039
        feedback_fn("Exporting %s from %s to %s" %
1040
                    (transfer.name, src_node, dest_node))
1041

    
1042
        magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1043
        opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1044
                                           compress=compress, magic=magic)
1045

    
1046
        dtp = _DiskTransferPrivate(transfer, True, opts)
1047

    
1048
        di = DiskImport(lu, dest_node, opts, instance,
1049
                        transfer.dest_io, transfer.dest_ioargs,
1050
                        timeouts, dest_cbs, private=dtp)
1051
        ieloop.Add(di)
1052

    
1053
        dtp.dest_import = di
1054
      else:
1055
        dtp = _DiskTransferPrivate(None, False, None)
1056

    
1057
      all_dtp.append(dtp)
1058

    
1059
    ieloop.Run()
1060
  finally:
1061
    ieloop.FinalizeAll()
1062

    
1063
  assert len(all_dtp) == len(all_transfers)
1064
  assert compat.all((dtp.src_export is None or
1065
                      dtp.src_export.success is not None) and
1066
                     (dtp.dest_import is None or
1067
                      dtp.dest_import.success is not None)
1068
                     for dtp in all_dtp), \
1069
         "Not all imports/exports are finalized"
1070

    
1071
  return [bool(dtp.success) for dtp in all_dtp]
1072

    
1073

    
1074
class _RemoteExportCb(ImportExportCbBase):
1075
  def __init__(self, feedback_fn, disk_count):
1076
    """Initializes this class.
1077

1078
    """
1079
    ImportExportCbBase.__init__(self)
1080
    self._feedback_fn = feedback_fn
1081
    self._dresults = [None] * disk_count
1082

    
1083
  @property
1084
  def disk_results(self):
1085
    """Returns per-disk results.
1086

1087
    """
1088
    return self._dresults
1089

    
1090
  def ReportConnected(self, ie, private):
1091
    """Called when a connection has been established.
1092

1093
    """
1094
    (idx, _) = private
1095

    
1096
    self._feedback_fn("Disk %s is now sending data" % idx)
1097

    
1098
  def ReportProgress(self, ie, private):
1099
    """Called when new progress information should be reported.
1100

1101
    """
1102
    (idx, _) = private
1103

    
1104
    progress = ie.progress
1105
    if not progress:
1106
      return
1107

    
1108
    self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1109

    
1110
  def ReportFinished(self, ie, private):
1111
    """Called when a transfer has finished.
1112

1113
    """
1114
    (idx, finished_fn) = private
1115

    
1116
    if ie.success:
1117
      self._feedback_fn("Disk %s finished sending data" % idx)
1118
    else:
1119
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" %
1120
                        (idx, ie.final_message, ie.recent_output))
1121

    
1122
    self._dresults[idx] = bool(ie.success)
1123

    
1124
    if finished_fn:
1125
      finished_fn()
1126

    
1127

    
1128
class ExportInstanceHelper:
1129
  def __init__(self, lu, feedback_fn, instance):
1130
    """Initializes this class.
1131

1132
    @param lu: Logical unit instance
1133
    @param feedback_fn: Feedback function
1134
    @type instance: L{objects.Instance}
1135
    @param instance: Instance object
1136

1137
    """
1138
    self._lu = lu
1139
    self._feedback_fn = feedback_fn
1140
    self._instance = instance
1141

    
1142
    self._snap_disks = []
1143
    self._removed_snaps = [False] * len(instance.disks)
1144

    
1145
  def CreateSnapshots(self):
1146
    """Creates an LVM snapshot for every disk of the instance.
1147

1148
    """
1149
    assert not self._snap_disks
1150

    
1151
    instance = self._instance
1152
    src_node = instance.primary_node
1153

    
1154
    vgname = self._lu.cfg.GetVGName()
1155

    
1156
    for idx, disk in enumerate(instance.disks):
1157
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1158
                        (idx, src_node))
1159

    
1160
      # result.payload will be a snapshot of an lvm leaf of the one we
1161
      # passed
1162
      result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1163
      msg = result.fail_msg
1164
      if msg:
1165
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1166
                            idx, src_node, msg)
1167
        new_dev = False
1168
      else:
1169
        disk_id = (vgname, result.payload)
1170
        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1171
                               logical_id=disk_id, physical_id=disk_id,
1172
                               iv_name=disk.iv_name)
1173

    
1174
      self._snap_disks.append(new_dev)
1175

    
1176
    assert len(self._snap_disks) == len(instance.disks)
1177
    assert len(self._removed_snaps) == len(instance.disks)
1178

    
1179
  def _RemoveSnapshot(self, disk_index):
1180
    """Removes an LVM snapshot.
1181

1182
    @type disk_index: number
1183
    @param disk_index: Index of the snapshot to be removed
1184

1185
    """
1186
    disk = self._snap_disks[disk_index]
1187
    if disk and not self._removed_snaps[disk_index]:
1188
      src_node = self._instance.primary_node
1189

    
1190
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1191
                        (disk_index, src_node))
1192

    
1193
      result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1194
      if result.fail_msg:
1195
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1196
                            " %s: %s", disk_index, src_node, result.fail_msg)
1197
      else:
1198
        self._removed_snaps[disk_index] = True
1199

    
1200
  def LocalExport(self, dest_node):
1201
    """Intra-cluster instance export.
1202

1203
    @type dest_node: L{objects.Node}
1204
    @param dest_node: Destination node
1205

1206
    """
1207
    instance = self._instance
1208
    src_node = instance.primary_node
1209

    
1210
    assert len(self._snap_disks) == len(instance.disks)
1211

    
1212
    transfers = []
1213

    
1214
    for idx, dev in enumerate(self._snap_disks):
1215
      if not dev:
1216
        transfers.append(None)
1217
        continue
1218

    
1219
      path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1220
                            dev.physical_id[1])
1221

    
1222
      finished_fn = compat.partial(self._TransferFinished, idx)
1223

    
1224
      # FIXME: pass debug option from opcode to backend
1225
      dt = DiskTransfer("snapshot/%s" % idx,
1226
                        constants.IEIO_SCRIPT, (dev, idx),
1227
                        constants.IEIO_FILE, (path, ),
1228
                        finished_fn)
1229
      transfers.append(dt)
1230

    
1231
    # Actually export data
1232
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1233
                                    src_node, dest_node.name,
1234
                                    dest_node.secondary_ip,
1235
                                    instance, transfers)
1236

    
1237
    assert len(dresults) == len(instance.disks)
1238

    
1239
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1240
    result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1241
                                               self._snap_disks)
1242
    msg = result.fail_msg
1243
    fin_resu = not msg
1244
    if msg:
1245
      self._lu.LogWarning("Could not finalize export for instance %s"
1246
                          " on node %s: %s", instance.name, dest_node.name, msg)
1247

    
1248
    return (fin_resu, dresults)
1249

    
1250
  def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1251
    """Inter-cluster instance export.
1252

1253
    @type disk_info: list
1254
    @param disk_info: Per-disk destination information
1255
    @type key_name: string
1256
    @param key_name: Name of X509 key to use
1257
    @type dest_ca_pem: string
1258
    @param dest_ca_pem: Destination X509 CA in PEM format
1259
    @type timeouts: L{ImportExportTimeouts}
1260
    @param timeouts: Timeouts for this import
1261

1262
    """
1263
    instance = self._instance
1264

    
1265
    assert len(disk_info) == len(instance.disks)
1266

    
1267
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1268

    
1269
    ieloop = ImportExportLoop(self._lu)
1270
    try:
1271
      for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1272
                                                           disk_info)):
1273
        opts = objects.ImportExportOptions(key_name=key_name,
1274
                                           ca_pem=dest_ca_pem,
1275
                                           magic=magic)
1276

    
1277
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1278
        finished_fn = compat.partial(self._TransferFinished, idx)
1279
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1280
                              opts, host, port, instance,
1281
                              constants.IEIO_SCRIPT, (dev, idx),
1282
                              timeouts, cbs, private=(idx, finished_fn)))
1283

    
1284
      ieloop.Run()
1285
    finally:
1286
      ieloop.FinalizeAll()
1287

    
1288
    return (True, cbs.disk_results)
1289

    
1290
  def _TransferFinished(self, idx):
1291
    """Called once a transfer has finished.
1292

1293
    @type idx: number
1294
    @param idx: Disk index
1295

1296
    """
1297
    logging.debug("Transfer %s finished", idx)
1298
    self._RemoveSnapshot(idx)
1299

    
1300
  def Cleanup(self):
1301
    """Remove all snapshots.
1302

1303
    """
1304
    assert len(self._removed_snaps) == len(self._instance.disks)
1305
    for idx in range(len(self._instance.disks)):
1306
      self._RemoveSnapshot(idx)
1307

    
1308

    
1309
class _RemoteImportCb(ImportExportCbBase):
1310
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1311
               external_address):
1312
    """Initializes this class.
1313

1314
    @type cds: string
1315
    @param cds: Cluster domain secret
1316
    @type x509_cert_pem: string
1317
    @param x509_cert_pem: CA used for signing import key
1318
    @type disk_count: number
1319
    @param disk_count: Number of disks
1320
    @type external_address: string
1321
    @param external_address: External address of destination node
1322

1323
    """
1324
    ImportExportCbBase.__init__(self)
1325
    self._feedback_fn = feedback_fn
1326
    self._cds = cds
1327
    self._x509_cert_pem = x509_cert_pem
1328
    self._disk_count = disk_count
1329
    self._external_address = external_address
1330

    
1331
    self._dresults = [None] * disk_count
1332
    self._daemon_port = [None] * disk_count
1333

    
1334
    self._salt = utils.GenerateSecret(8)
1335

    
1336
  @property
1337
  def disk_results(self):
1338
    """Returns per-disk results.
1339

1340
    """
1341
    return self._dresults
1342

    
1343
  def _CheckAllListening(self):
1344
    """Checks whether all daemons are listening.
1345

1346
    If all daemons are listening, the information is sent to the client.
1347

1348
    """
1349
    if not compat.all(dp is not None for dp in self._daemon_port):
1350
      return
1351

    
1352
    host = self._external_address
1353

    
1354
    disks = []
1355
    for idx, (port, magic) in enumerate(self._daemon_port):
1356
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1357
                                               idx, host, port, magic))
1358

    
1359
    assert len(disks) == self._disk_count
1360

    
1361
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1362
      "disks": disks,
1363
      "x509_ca": self._x509_cert_pem,
1364
      })
1365

    
1366
  def ReportListening(self, ie, private):
1367
    """Called when daemon started listening.
1368

1369
    """
1370
    (idx, ) = private
1371

    
1372
    self._feedback_fn("Disk %s is now listening" % idx)
1373

    
1374
    assert self._daemon_port[idx] is None
1375

    
1376
    self._daemon_port[idx] = (ie.listen_port, ie.magic)
1377

    
1378
    self._CheckAllListening()
1379

    
1380
  def ReportConnected(self, ie, private):
1381
    """Called when a connection has been established.
1382

1383
    """
1384
    (idx, ) = private
1385

    
1386
    self._feedback_fn("Disk %s is now receiving data" % idx)
1387

    
1388
  def ReportFinished(self, ie, private):
1389
    """Called when a transfer has finished.
1390

1391
    """
1392
    (idx, ) = private
1393

    
1394
    # Daemon is certainly no longer listening
1395
    self._daemon_port[idx] = None
1396

    
1397
    if ie.success:
1398
      self._feedback_fn("Disk %s finished receiving data" % idx)
1399
    else:
1400
      self._feedback_fn(("Disk %s failed to receive data: %s"
1401
                         " (recent output: %r)") %
1402
                        (idx, ie.final_message, ie.recent_output))
1403

    
1404
    self._dresults[idx] = bool(ie.success)
1405

    
1406

    
1407
def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts):
1408
  """Imports an instance from another cluster.
1409

1410
  @param lu: Logical unit instance
1411
  @param feedback_fn: Feedback function
1412
  @type instance: L{objects.Instance}
1413
  @param instance: Instance object
1414
  @type source_x509_ca: OpenSSL.crypto.X509
1415
  @param source_x509_ca: Import source's X509 CA
1416
  @type cds: string
1417
  @param cds: Cluster domain secret
1418
  @type timeouts: L{ImportExportTimeouts}
1419
  @param timeouts: Timeouts for this import
1420

1421
  """
1422
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1423
                                                  source_x509_ca)
1424

    
1425
  magic_base = utils.GenerateSecret(6)
1426

    
1427
  # Create crypto key
1428
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1429
                                        constants.RIE_CERT_VALIDITY)
1430
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1431

    
1432
  (x509_key_name, x509_cert_pem) = result.payload
1433
  try:
1434
    # Load certificate
1435
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1436
                                                x509_cert_pem)
1437

    
1438
    # Sign certificate
1439
    signed_x509_cert_pem = \
1440
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1441

    
1442
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1443
                          len(instance.disks), instance.primary_node)
1444

    
1445
    ieloop = ImportExportLoop(lu)
1446
    try:
1447
      for idx, dev in enumerate(instance.disks):
1448
        magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1449

    
1450
        # Import daemon options
1451
        opts = objects.ImportExportOptions(key_name=x509_key_name,
1452
                                           ca_pem=source_ca_pem,
1453
                                           magic=magic)
1454

    
1455
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1456
                              constants.IEIO_SCRIPT, (dev, idx),
1457
                              timeouts, cbs, private=(idx, )))
1458

    
1459
      ieloop.Run()
1460
    finally:
1461
      ieloop.FinalizeAll()
1462
  finally:
1463
    # Remove crypto key and certificate
1464
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1465
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1466

    
1467
  return cbs.disk_results
1468

    
1469

    
1470
def _GetImportExportHandshakeMessage(version):
1471
  """Returns the handshake message for a RIE protocol version.
1472

1473
  @type version: number
1474

1475
  """
1476
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1477

    
1478

    
1479
def ComputeRemoteExportHandshake(cds):
1480
  """Computes the remote import/export handshake.
1481

1482
  @type cds: string
1483
  @param cds: Cluster domain secret
1484

1485
  """
1486
  salt = utils.GenerateSecret(8)
1487
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1488
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1489

    
1490

    
1491
def CheckRemoteExportHandshake(cds, handshake):
1492
  """Checks the handshake of a remote import/export.
1493

1494
  @type cds: string
1495
  @param cds: Cluster domain secret
1496
  @type handshake: sequence
1497
  @param handshake: Handshake sent by remote peer
1498

1499
  """
1500
  try:
1501
    (version, hmac_digest, hmac_salt) = handshake
1502
  except (TypeError, ValueError), err:
1503
    return "Invalid data: %s" % err
1504

    
1505
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1506
                              hmac_digest, salt=hmac_salt):
1507
    return "Hash didn't match, clusters don't share the same domain secret"
1508

    
1509
  if version != constants.RIE_VERSION:
1510
    return ("Clusters don't have the same remote import/export protocol"
1511
            " (local=%s, remote=%s)" %
1512
            (constants.RIE_VERSION, version))
1513

    
1514
  return None
1515

    
1516

    
1517
def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1518
  """Returns the hashed text for import/export disk information.
1519

1520
  @type disk_index: number
1521
  @param disk_index: Index of disk (included in hash)
1522
  @type host: string
1523
  @param host: Hostname
1524
  @type port: number
1525
  @param port: Daemon port
1526
  @type magic: string
1527
  @param magic: Magic value
1528

1529
  """
1530
  return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1531

    
1532

    
1533
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1534
  """Verifies received disk information for an export.
1535

1536
  @type cds: string
1537
  @param cds: Cluster domain secret
1538
  @type disk_index: number
1539
  @param disk_index: Index of disk (included in hash)
1540
  @type disk_info: sequence
1541
  @param disk_info: Disk information sent by remote peer
1542

1543
  """
1544
  try:
1545
    (host, port, magic, hmac_digest, hmac_salt) = disk_info
1546
  except (TypeError, ValueError), err:
1547
    raise errors.GenericError("Invalid data: %s" % err)
1548

    
1549
  if not (host and port and magic):
1550
    raise errors.GenericError("Missing destination host, port or magic")
1551

    
1552
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1553

    
1554
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1555
    raise errors.GenericError("HMAC is wrong")
1556

    
1557
  return (netutils.Hostname.GetNormalizedName(host),
1558
          utils.ValidateServiceName(port),
1559
          magic)
1560

    
1561

    
1562
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1563
  """Computes the signed disk information for a remote import.
1564

1565
  @type cds: string
1566
  @param cds: Cluster domain secret
1567
  @type salt: string
1568
  @param salt: HMAC salt
1569
  @type disk_index: number
1570
  @param disk_index: Index of disk (included in hash)
1571
  @type host: string
1572
  @param host: Hostname
1573
  @type port: number
1574
  @param port: Daemon port
1575
  @type magic: string
1576
  @param magic: Magic value
1577

1578
  """
1579
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1580
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1581
  return (host, port, magic, hmac_digest, salt)