Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ 4478301b

History | View | Annotate | Download (43.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Instance-related functions and classes for masterd.
23

24
"""
25

    
26
import logging
27
import time
28
import OpenSSL
29

    
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import compat
33
from ganeti import utils
34
from ganeti import objects
35
from ganeti import netutils
36

    
37

    
38
class _ImportExportError(Exception):
39
  """Local exception to report import/export errors.
40

41
  """
42

    
43

    
44
class ImportExportTimeouts(object):
45
  #: Time until daemon starts writing status file
46
  DEFAULT_READY_TIMEOUT = 10
47

    
48
  #: Length of time until errors cause hard failure
49
  DEFAULT_ERROR_TIMEOUT = 10
50

    
51
  #: Time after which daemon must be listening
52
  DEFAULT_LISTEN_TIMEOUT = 10
53

    
54
  #: Progress update interval
55
  DEFAULT_PROGRESS_INTERVAL = 60
56

    
57
  __slots__ = [
58
    "error",
59
    "ready",
60
    "listen",
61
    "connect",
62
    "progress",
63
    ]
64

    
65
  def __init__(self, connect,
66
               listen=DEFAULT_LISTEN_TIMEOUT,
67
               error=DEFAULT_ERROR_TIMEOUT,
68
               ready=DEFAULT_READY_TIMEOUT,
69
               progress=DEFAULT_PROGRESS_INTERVAL):
70
    """Initializes this class.
71

72
    @type connect: number
73
    @param connect: Timeout for establishing connection
74
    @type listen: number
75
    @param listen: Timeout for starting to listen for connections
76
    @type error: number
77
    @param error: Length of time until errors cause hard failure
78
    @type ready: number
79
    @param ready: Timeout for daemon to become ready
80
    @type progress: number
81
    @param progress: Progress update interval
82

83
    """
84
    self.error = error
85
    self.ready = ready
86
    self.listen = listen
87
    self.connect = connect
88
    self.progress = progress
89

    
90

    
91
class ImportExportCbBase(object):
92
  """Callbacks for disk import/export.
93

94
  """
95
  def ReportListening(self, ie, private):
96
    """Called when daemon started listening.
97

98
    @type ie: Subclass of L{_DiskImportExportBase}
99
    @param ie: Import/export object
100
    @param private: Private data passed to import/export object
101

102
    """
103

    
104
  def ReportConnected(self, ie, private):
105
    """Called when a connection has been established.
106

107
    @type ie: Subclass of L{_DiskImportExportBase}
108
    @param ie: Import/export object
109
    @param private: Private data passed to import/export object
110

111
    """
112

    
113
  def ReportProgress(self, ie, private):
114
    """Called when new progress information should be reported.
115

116
    @type ie: Subclass of L{_DiskImportExportBase}
117
    @param ie: Import/export object
118
    @param private: Private data passed to import/export object
119

120
    """
121

    
122
  def ReportFinished(self, ie, private):
123
    """Called when a transfer has finished.
124

125
    @type ie: Subclass of L{_DiskImportExportBase}
126
    @param ie: Import/export object
127
    @param private: Private data passed to import/export object
128

129
    """
130

    
131

    
132
def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
133
  """Checks whether a timeout has expired.
134

135
  """
136
  return _time_fn() > (epoch + timeout)
137

    
138

    
139
class _DiskImportExportBase(object):
140
  MODE_TEXT = None
141

    
142
  def __init__(self, lu, node_name, opts,
143
               instance, timeouts, cbs, private=None):
144
    """Initializes this class.
145

146
    @param lu: Logical unit instance
147
    @type node_name: string
148
    @param node_name: Node name for import
149
    @type opts: L{objects.ImportExportOptions}
150
    @param opts: Import/export daemon options
151
    @type instance: L{objects.Instance}
152
    @param instance: Instance object
153
    @type timeouts: L{ImportExportTimeouts}
154
    @param timeouts: Timeouts for this import
155
    @type cbs: L{ImportExportCbBase}
156
    @param cbs: Callbacks
157
    @param private: Private data for callback functions
158

159
    """
160
    assert self.MODE_TEXT
161

    
162
    self._lu = lu
163
    self.node_name = node_name
164
    self._opts = opts.Copy()
165
    self._instance = instance
166
    self._timeouts = timeouts
167
    self._cbs = cbs
168
    self._private = private
169

    
170
    # Set master daemon's timeout in options for import/export daemon
171
    assert self._opts.connect_timeout is None
172
    self._opts.connect_timeout = timeouts.connect
173

    
174
    # Parent loop
175
    self._loop = None
176

    
177
    # Timestamps
178
    self._ts_begin = None
179
    self._ts_connected = None
180
    self._ts_finished = None
181
    self._ts_cleanup = None
182
    self._ts_last_progress = None
183
    self._ts_last_error = None
184

    
185
    # Transfer status
186
    self.success = None
187
    self.final_message = None
188

    
189
    # Daemon status
190
    self._daemon_name = None
191
    self._daemon = None
192

    
193
  @property
194
  def recent_output(self):
195
    """Returns the most recent output from the daemon.
196

197
    """
198
    if self._daemon:
199
      return self._daemon.recent_output
200

    
201
    return None
202

    
203
  @property
204
  def progress(self):
205
    """Returns transfer progress information.
206

207
    """
208
    if not self._daemon:
209
      return None
210

    
211
    return (self._daemon.progress_mbytes,
212
            self._daemon.progress_throughput,
213
            self._daemon.progress_percent,
214
            self._daemon.progress_eta)
215

    
216
  @property
217
  def magic(self):
218
    """Returns the magic value for this import/export.
219

220
    """
221
    return self._opts.magic
222

    
223
  @property
224
  def active(self):
225
    """Determines whether this transport is still active.
226

227
    """
228
    return self.success is None
229

    
230
  @property
231
  def loop(self):
232
    """Returns parent loop.
233

234
    @rtype: L{ImportExportLoop}
235

236
    """
237
    return self._loop
238

    
239
  def SetLoop(self, loop):
240
    """Sets the parent loop.
241

242
    @type loop: L{ImportExportLoop}
243

244
    """
245
    if self._loop:
246
      raise errors.ProgrammerError("Loop can only be set once")
247

    
248
    self._loop = loop
249

    
250
  def _StartDaemon(self):
251
    """Starts the import/export daemon.
252

253
    """
254
    raise NotImplementedError()
255

    
256
  def CheckDaemon(self):
257
    """Checks whether daemon has been started and if not, starts it.
258

259
    @rtype: string
260
    @return: Daemon name
261

262
    """
263
    assert self._ts_cleanup is None
264

    
265
    if self._daemon_name is None:
266
      assert self._ts_begin is None
267

    
268
      result = self._StartDaemon()
269
      if result.fail_msg:
270
        raise _ImportExportError("Failed to start %s on %s: %s" %
271
                                 (self.MODE_TEXT, self.node_name,
272
                                  result.fail_msg))
273

    
274
      daemon_name = result.payload
275

    
276
      logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
277
                   self.node_name)
278

    
279
      self._ts_begin = time.time()
280
      self._daemon_name = daemon_name
281

    
282
    return self._daemon_name
283

    
284
  def GetDaemonName(self):
285
    """Returns the daemon name.
286

287
    """
288
    assert self._daemon_name, "Daemon has not been started"
289
    assert self._ts_cleanup is None
290
    return self._daemon_name
291

    
292
  def Abort(self):
293
    """Sends SIGTERM to import/export daemon (if still active).
294

295
    """
296
    if self._daemon_name:
297
      self._lu.LogWarning("Aborting %s %r on %s",
298
                          self.MODE_TEXT, self._daemon_name, self.node_name)
299
      result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
300
      if result.fail_msg:
301
        self._lu.LogWarning("Failed to abort %s %r on %s: %s",
302
                            self.MODE_TEXT, self._daemon_name,
303
                            self.node_name, result.fail_msg)
304
        return False
305

    
306
    return True
307

    
308
  def _SetDaemonData(self, data):
309
    """Internal function for updating status daemon data.
310

311
    @type data: L{objects.ImportExportStatus}
312
    @param data: Daemon status data
313

314
    """
315
    assert self._ts_begin is not None
316

    
317
    if not data:
318
      if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
319
        raise _ImportExportError("Didn't become ready after %s seconds" %
320
                                 self._timeouts.ready)
321

    
322
      return False
323

    
324
    self._daemon = data
325

    
326
    return True
327

    
328
  def SetDaemonData(self, success, data):
329
    """Updates daemon status data.
330

331
    @type success: bool
332
    @param success: Whether fetching data was successful or not
333
    @type data: L{objects.ImportExportStatus}
334
    @param data: Daemon status data
335

336
    """
337
    if not success:
338
      if self._ts_last_error is None:
339
        self._ts_last_error = time.time()
340

    
341
      elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
342
        raise _ImportExportError("Too many errors while updating data")
343

    
344
      return False
345

    
346
    self._ts_last_error = None
347

    
348
    return self._SetDaemonData(data)
349

    
350
  def CheckListening(self):
351
    """Checks whether the daemon is listening.
352

353
    """
354
    raise NotImplementedError()
355

    
356
  def _GetConnectedCheckEpoch(self):
357
    """Returns timeout to calculate connect timeout.
358

359
    """
360
    raise NotImplementedError()
361

    
362
  def CheckConnected(self):
363
    """Checks whether the daemon is connected.
364

365
    @rtype: bool
366
    @return: Whether the daemon is connected
367

368
    """
369
    assert self._daemon, "Daemon status missing"
370

    
371
    if self._ts_connected is not None:
372
      return True
373

    
374
    if self._daemon.connected:
375
      self._ts_connected = time.time()
376

    
377
      # TODO: Log remote peer
378
      logging.debug("%s %r on %s is now connected",
379
                    self.MODE_TEXT, self._daemon_name, self.node_name)
380

    
381
      self._cbs.ReportConnected(self, self._private)
382

    
383
      return True
384

    
385
    if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
386
      raise _ImportExportError("Not connected after %s seconds" %
387
                               self._timeouts.connect)
388

    
389
    return False
390

    
391
  def _CheckProgress(self):
392
    """Checks whether a progress update should be reported.
393

394
    """
395
    if ((self._ts_last_progress is None or
396
         _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
397
        self._daemon and
398
        self._daemon.progress_mbytes is not None and
399
        self._daemon.progress_throughput is not None):
400
      self._cbs.ReportProgress(self, self._private)
401
      self._ts_last_progress = time.time()
402

    
403
  def CheckFinished(self):
404
    """Checks whether the daemon exited.
405

406
    @rtype: bool
407
    @return: Whether the transfer is finished
408

409
    """
410
    assert self._daemon, "Daemon status missing"
411

    
412
    if self._ts_finished:
413
      return True
414

    
415
    if self._daemon.exit_status is None:
416
      # TODO: Adjust delay for ETA expiring soon
417
      self._CheckProgress()
418
      return False
419

    
420
    self._ts_finished = time.time()
421

    
422
    self._ReportFinished(self._daemon.exit_status == 0,
423
                         self._daemon.error_message)
424

    
425
    return True
426

    
427
  def _ReportFinished(self, success, message):
428
    """Transfer is finished or daemon exited.
429

430
    @type success: bool
431
    @param success: Whether the transfer was successful
432
    @type message: string
433
    @param message: Error message
434

435
    """
436
    assert self.success is None
437

    
438
    self.success = success
439
    self.final_message = message
440

    
441
    if success:
442
      logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
443
                   self.node_name)
444
    elif self._daemon_name:
445
      self._lu.LogWarning("%s %r on %s failed: %s",
446
                          self.MODE_TEXT, self._daemon_name, self.node_name,
447
                          message)
448
    else:
449
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
450
                          self.node_name, message)
451

    
452
    self._cbs.ReportFinished(self, self._private)
453

    
454
  def _Finalize(self):
455
    """Makes the RPC call to finalize this import/export.
456

457
    """
458
    return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
459

    
460
  def Finalize(self, error=None):
461
    """Finalizes this import/export.
462

463
    """
464
    if self._daemon_name:
465
      logging.info("Finalizing %s %r on %s",
466
                   self.MODE_TEXT, self._daemon_name, self.node_name)
467

    
468
      result = self._Finalize()
469
      if result.fail_msg:
470
        self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
471
                            self.MODE_TEXT, self._daemon_name,
472
                            self.node_name, result.fail_msg)
473
        return False
474

    
475
      # Daemon is no longer running
476
      self._daemon_name = None
477
      self._ts_cleanup = time.time()
478

    
479
    if error:
480
      self._ReportFinished(False, error)
481

    
482
    return True
483

    
484

    
485
class DiskImport(_DiskImportExportBase):
486
  MODE_TEXT = "import"
487

    
488
  def __init__(self, lu, node_name, opts, instance,
489
               dest, dest_args, timeouts, cbs, private=None):
490
    """Initializes this class.
491

492
    @param lu: Logical unit instance
493
    @type node_name: string
494
    @param node_name: Node name for import
495
    @type opts: L{objects.ImportExportOptions}
496
    @param opts: Import/export daemon options
497
    @type instance: L{objects.Instance}
498
    @param instance: Instance object
499
    @param dest: I/O destination
500
    @param dest_args: I/O arguments
501
    @type timeouts: L{ImportExportTimeouts}
502
    @param timeouts: Timeouts for this import
503
    @type cbs: L{ImportExportCbBase}
504
    @param cbs: Callbacks
505
    @param private: Private data for callback functions
506

507
    """
508
    _DiskImportExportBase.__init__(self, lu, node_name, opts,
509
                                   instance, timeouts, cbs, private)
510
    self._dest = dest
511
    self._dest_args = dest_args
512

    
513
    # Timestamps
514
    self._ts_listening = None
515

    
516
  @property
517
  def listen_port(self):
518
    """Returns the port the daemon is listening on.
519

520
    """
521
    if self._daemon:
522
      return self._daemon.listen_port
523

    
524
    return None
525

    
526
  def _StartDaemon(self):
527
    """Starts the import daemon.
528

529
    """
530
    return self._lu.rpc.call_import_start(self.node_name, self._opts,
531
                                          self._instance,
532
                                          self._dest, self._dest_args)
533

    
534
  def CheckListening(self):
535
    """Checks whether the daemon is listening.
536

537
    @rtype: bool
538
    @return: Whether the daemon is listening
539

540
    """
541
    assert self._daemon, "Daemon status missing"
542

    
543
    if self._ts_listening is not None:
544
      return True
545

    
546
    port = self._daemon.listen_port
547
    if port is not None:
548
      self._ts_listening = time.time()
549

    
550
      logging.debug("Import %r on %s is now listening on port %s",
551
                    self._daemon_name, self.node_name, port)
552

    
553
      self._cbs.ReportListening(self, self._private)
554

    
555
      return True
556

    
557
    if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
558
      raise _ImportExportError("Not listening after %s seconds" %
559
                               self._timeouts.listen)
560

    
561
    return False
562

    
563
  def _GetConnectedCheckEpoch(self):
564
    """Returns the time since we started listening.
565

566
    """
567
    assert self._ts_listening is not None, \
568
           ("Checking whether an import is connected is only useful"
569
            " once it's been listening")
570

    
571
    return self._ts_listening
572

    
573

    
574
class DiskExport(_DiskImportExportBase):
575
  MODE_TEXT = "export"
576

    
577
  def __init__(self, lu, node_name, opts,
578
               dest_host, dest_port, instance, source, source_args,
579
               timeouts, cbs, private=None):
580
    """Initializes this class.
581

582
    @param lu: Logical unit instance
583
    @type node_name: string
584
    @param node_name: Node name for import
585
    @type opts: L{objects.ImportExportOptions}
586
    @param opts: Import/export daemon options
587
    @type dest_host: string
588
    @param dest_host: Destination host name or IP address
589
    @type dest_port: number
590
    @param dest_port: Destination port number
591
    @type instance: L{objects.Instance}
592
    @param instance: Instance object
593
    @param source: I/O source
594
    @param source_args: I/O source
595
    @type timeouts: L{ImportExportTimeouts}
596
    @param timeouts: Timeouts for this import
597
    @type cbs: L{ImportExportCbBase}
598
    @param cbs: Callbacks
599
    @param private: Private data for callback functions
600

601
    """
602
    _DiskImportExportBase.__init__(self, lu, node_name, opts,
603
                                   instance, timeouts, cbs, private)
604
    self._dest_host = dest_host
605
    self._dest_port = dest_port
606
    self._source = source
607
    self._source_args = source_args
608

    
609
  def _StartDaemon(self):
610
    """Starts the export daemon.
611

612
    """
613
    return self._lu.rpc.call_export_start(self.node_name, self._opts,
614
                                          self._dest_host, self._dest_port,
615
                                          self._instance, self._source,
616
                                          self._source_args)
617

    
618
  def CheckListening(self):
619
    """Checks whether the daemon is listening.
620

621
    """
622
    # Only an import can be listening
623
    return True
624

    
625
  def _GetConnectedCheckEpoch(self):
626
    """Returns the time since the daemon started.
627

628
    """
629
    assert self._ts_begin is not None
630

    
631
    return self._ts_begin
632

    
633

    
634
def FormatProgress(progress):
635
  """Formats progress information for user consumption
636

637
  """
638
  (mbytes, throughput, percent, eta) = progress
639

    
640
  parts = [
641
    utils.FormatUnit(mbytes, "h"),
642

    
643
    # Not using FormatUnit as it doesn't support kilobytes
644
    "%0.1f MiB/s" % throughput,
645
    ]
646

    
647
  if percent is not None:
648
    parts.append("%d%%" % percent)
649

    
650
  if eta is not None:
651
    parts.append("ETA %s" % utils.FormatSeconds(eta))
652

    
653
  return utils.CommaJoin(parts)
654

    
655

    
656
class ImportExportLoop:
657
  MIN_DELAY = 1.0
658
  MAX_DELAY = 20.0
659

    
660
  def __init__(self, lu):
661
    """Initializes this class.
662

663
    """
664
    self._lu = lu
665
    self._queue = []
666
    self._pending_add = []
667

    
668
  def Add(self, diskie):
669
    """Adds an import/export object to the loop.
670

671
    @type diskie: Subclass of L{_DiskImportExportBase}
672
    @param diskie: Import/export object
673

674
    """
675
    assert diskie not in self._pending_add
676
    assert diskie.loop is None
677

    
678
    diskie.SetLoop(self)
679

    
680
    # Adding new objects to a staging list is necessary, otherwise the main
681
    # loop gets confused if callbacks modify the queue while the main loop is
682
    # iterating over it.
683
    self._pending_add.append(diskie)
684

    
685
  @staticmethod
686
  def _CollectDaemonStatus(lu, daemons):
687
    """Collects the status for all import/export daemons.
688

689
    """
690
    daemon_status = {}
691

    
692
    for node_name, names in daemons.iteritems():
693
      result = lu.rpc.call_impexp_status(node_name, names)
694
      if result.fail_msg:
695
        lu.LogWarning("Failed to get daemon status on %s: %s",
696
                      node_name, result.fail_msg)
697
        continue
698

    
699
      assert len(names) == len(result.payload)
700

    
701
      daemon_status[node_name] = dict(zip(names, result.payload))
702

    
703
    return daemon_status
704

    
705
  @staticmethod
706
  def _GetActiveDaemonNames(queue):
707
    """Gets the names of all active daemons.
708

709
    """
710
    result = {}
711
    for diskie in queue:
712
      if not diskie.active:
713
        continue
714

    
715
      try:
716
        # Start daemon if necessary
717
        daemon_name = diskie.CheckDaemon()
718
      except _ImportExportError, err:
719
        logging.exception("%s failed", diskie.MODE_TEXT)
720
        diskie.Finalize(error=str(err))
721
        continue
722

    
723
      result.setdefault(diskie.node_name, []).append(daemon_name)
724

    
725
    assert len(queue) >= len(result)
726
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
727

    
728
    logging.debug("daemons=%r", result)
729

    
730
    return result
731

    
732
  def _AddPendingToQueue(self):
733
    """Adds all pending import/export objects to the internal queue.
734

735
    """
736
    assert compat.all(diskie not in self._queue and diskie.loop == self
737
                      for diskie in self._pending_add)
738

    
739
    self._queue.extend(self._pending_add)
740

    
741
    del self._pending_add[:]
742

    
743
  def Run(self):
744
    """Utility main loop.
745

746
    """
747
    while True:
748
      self._AddPendingToQueue()
749

    
750
      # Collect all active daemon names
751
      daemons = self._GetActiveDaemonNames(self._queue)
752
      if not daemons:
753
        break
754

    
755
      # Collection daemon status data
756
      data = self._CollectDaemonStatus(self._lu, daemons)
757

    
758
      # Use data
759
      delay = self.MAX_DELAY
760
      for diskie in self._queue:
761
        if not diskie.active:
762
          continue
763

    
764
        try:
765
          try:
766
            all_daemon_data = data[diskie.node_name]
767
          except KeyError:
768
            result = diskie.SetDaemonData(False, None)
769
          else:
770
            result = \
771
              diskie.SetDaemonData(True,
772
                                   all_daemon_data[diskie.GetDaemonName()])
773

    
774
          if not result:
775
            # Daemon not yet ready, retry soon
776
            delay = min(3.0, delay)
777
            continue
778

    
779
          if diskie.CheckFinished():
780
            # Transfer finished
781
            diskie.Finalize()
782
            continue
783

    
784
          # Normal case: check again in 5 seconds
785
          delay = min(5.0, delay)
786

    
787
          if not diskie.CheckListening():
788
            # Not yet listening, retry soon
789
            delay = min(1.0, delay)
790
            continue
791

    
792
          if not diskie.CheckConnected():
793
            # Not yet connected, retry soon
794
            delay = min(1.0, delay)
795
            continue
796

    
797
        except _ImportExportError, err:
798
          logging.exception("%s failed", diskie.MODE_TEXT)
799
          diskie.Finalize(error=str(err))
800

    
801
      if not compat.any(diskie.active for diskie in self._queue):
802
        break
803

    
804
      # Wait a bit
805
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
806
      logging.debug("Waiting for %ss", delay)
807
      time.sleep(delay)
808

    
809
  def FinalizeAll(self):
810
    """Finalizes all pending transfers.
811

812
    """
813
    success = True
814

    
815
    for diskie in self._queue:
816
      success = diskie.Finalize() and success
817

    
818
    return success
819

    
820

    
821
class _TransferInstCbBase(ImportExportCbBase):
822
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
823
               dest_node, dest_ip):
824
    """Initializes this class.
825

826
    """
827
    ImportExportCbBase.__init__(self)
828

    
829
    self.lu = lu
830
    self.feedback_fn = feedback_fn
831
    self.instance = instance
832
    self.timeouts = timeouts
833
    self.src_node = src_node
834
    self.src_cbs = src_cbs
835
    self.dest_node = dest_node
836
    self.dest_ip = dest_ip
837

    
838

    
839
class _TransferInstSourceCb(_TransferInstCbBase):
840
  def ReportConnected(self, ie, dtp):
841
    """Called when a connection has been established.
842

843
    """
844
    assert self.src_cbs is None
845
    assert dtp.src_export == ie
846
    assert dtp.dest_import
847

    
848
    self.feedback_fn("%s is sending data on %s" %
849
                     (dtp.data.name, ie.node_name))
850

    
851
  def ReportProgress(self, ie, dtp):
852
    """Called when new progress information should be reported.
853

854
    """
855
    progress = ie.progress
856
    if not progress:
857
      return
858

    
859
    self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
860

    
861
  def ReportFinished(self, ie, dtp):
862
    """Called when a transfer has finished.
863

864
    """
865
    assert self.src_cbs is None
866
    assert dtp.src_export == ie
867
    assert dtp.dest_import
868

    
869
    if ie.success:
870
      self.feedback_fn("%s finished sending data" % dtp.data.name)
871
    else:
872
      self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
873
                       (dtp.data.name, ie.final_message, ie.recent_output))
874

    
875
    dtp.RecordResult(ie.success)
876

    
877
    cb = dtp.data.finished_fn
878
    if cb:
879
      cb()
880

    
881
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
882
    # give the daemon a moment to sort things out
883
    if dtp.dest_import and not ie.success:
884
      dtp.dest_import.Abort()
885

    
886

    
887
class _TransferInstDestCb(_TransferInstCbBase):
888
  def ReportListening(self, ie, dtp):
889
    """Called when daemon started listening.
890

891
    """
892
    assert self.src_cbs
893
    assert dtp.src_export is None
894
    assert dtp.dest_import
895
    assert dtp.export_opts
896

    
897
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
898

    
899
    # Start export on source node
900
    de = DiskExport(self.lu, self.src_node, dtp.export_opts,
901
                    self.dest_ip, ie.listen_port,
902
                    self.instance, dtp.data.src_io, dtp.data.src_ioargs,
903
                    self.timeouts, self.src_cbs, private=dtp)
904
    ie.loop.Add(de)
905

    
906
    dtp.src_export = de
907

    
908
  def ReportConnected(self, ie, dtp):
909
    """Called when a connection has been established.
910

911
    """
912
    self.feedback_fn("%s is receiving data on %s" %
913
                     (dtp.data.name, self.dest_node))
914

    
915
  def ReportFinished(self, ie, dtp):
916
    """Called when a transfer has finished.
917

918
    """
919
    if ie.success:
920
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
921
    else:
922
      self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
923
                       (dtp.data.name, ie.final_message, ie.recent_output))
924

    
925
    dtp.RecordResult(ie.success)
926

    
927
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
928
    # give the daemon a moment to sort things out
929
    if dtp.src_export and not ie.success:
930
      dtp.src_export.Abort()
931

    
932

    
933
class DiskTransfer(object):
934
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
935
               finished_fn):
936
    """Initializes this class.
937

938
    @type name: string
939
    @param name: User-visible name for this transfer (e.g. "disk/0")
940
    @param src_io: Source I/O type
941
    @param src_ioargs: Source I/O arguments
942
    @param dest_io: Destination I/O type
943
    @param dest_ioargs: Destination I/O arguments
944
    @type finished_fn: callable
945
    @param finished_fn: Function called once transfer has finished
946

947
    """
948
    self.name = name
949

    
950
    self.src_io = src_io
951
    self.src_ioargs = src_ioargs
952

    
953
    self.dest_io = dest_io
954
    self.dest_ioargs = dest_ioargs
955

    
956
    self.finished_fn = finished_fn
957

    
958

    
959
class _DiskTransferPrivate(object):
960
  def __init__(self, data, success, export_opts):
961
    """Initializes this class.
962

963
    @type data: L{DiskTransfer}
964
    @type success: bool
965

966
    """
967
    self.data = data
968
    self.success = success
969
    self.export_opts = export_opts
970

    
971
    self.src_export = None
972
    self.dest_import = None
973

    
974
  def RecordResult(self, success):
975
    """Updates the status.
976

977
    One failed part will cause the whole transfer to fail.
978

979
    """
980
    self.success = self.success and success
981

    
982

    
983
def _GetInstDiskMagic(base, instance_name, index):
984
  """Computes the magic value for a disk export or import.
985

986
  @type base: string
987
  @param base: Random seed value (can be the same for all disks of a transfer)
988
  @type instance_name: string
989
  @param instance_name: Name of instance
990
  @type index: number
991
  @param index: Disk index
992

993
  """
994
  h = compat.sha1_hash()
995
  h.update(str(constants.RIE_VERSION))
996
  h.update(base)
997
  h.update(instance_name)
998
  h.update(str(index))
999
  return h.hexdigest()
1000

    
1001

    
1002
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
1003
                         instance, all_transfers):
1004
  """Transfers an instance's data from one node to another.
1005

1006
  @param lu: Logical unit instance
1007
  @param feedback_fn: Feedback function
1008
  @type src_node: string
1009
  @param src_node: Source node name
1010
  @type dest_node: string
1011
  @param dest_node: Destination node name
1012
  @type dest_ip: string
1013
  @param dest_ip: IP address of destination node
1014
  @type instance: L{objects.Instance}
1015
  @param instance: Instance object
1016
  @type all_transfers: list of L{DiskTransfer} instances
1017
  @param all_transfers: List of all disk transfers to be made
1018
  @rtype: list
1019
  @return: List with a boolean (True=successful, False=failed) for success for
1020
           each transfer
1021

1022
  """
1023
  # Disable compression for all moves as these are all within the same cluster
1024
  compress = constants.IEC_NONE
1025

    
1026
  logging.debug("Source node %s, destination node %s, compression '%s'",
1027
                src_node, dest_node, compress)
1028

    
1029
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1030
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1031
                                  src_node, None, dest_node, dest_ip)
1032
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1033
                                 src_node, src_cbs, dest_node, dest_ip)
1034

    
1035
  all_dtp = []
1036

    
1037
  base_magic = utils.GenerateSecret(6)
1038

    
1039
  ieloop = ImportExportLoop(lu)
1040
  try:
1041
    for idx, transfer in enumerate(all_transfers):
1042
      if transfer:
1043
        feedback_fn("Exporting %s from %s to %s" %
1044
                    (transfer.name, src_node, dest_node))
1045

    
1046
        magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1047
        opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1048
                                           compress=compress, magic=magic)
1049

    
1050
        dtp = _DiskTransferPrivate(transfer, True, opts)
1051

    
1052
        di = DiskImport(lu, dest_node, opts, instance,
1053
                        transfer.dest_io, transfer.dest_ioargs,
1054
                        timeouts, dest_cbs, private=dtp)
1055
        ieloop.Add(di)
1056

    
1057
        dtp.dest_import = di
1058
      else:
1059
        dtp = _DiskTransferPrivate(None, False, None)
1060

    
1061
      all_dtp.append(dtp)
1062

    
1063
    ieloop.Run()
1064
  finally:
1065
    ieloop.FinalizeAll()
1066

    
1067
  assert len(all_dtp) == len(all_transfers)
1068
  assert compat.all((dtp.src_export is None or
1069
                      dtp.src_export.success is not None) and
1070
                     (dtp.dest_import is None or
1071
                      dtp.dest_import.success is not None)
1072
                     for dtp in all_dtp), \
1073
         "Not all imports/exports are finalized"
1074

    
1075
  return [bool(dtp.success) for dtp in all_dtp]
1076

    
1077

    
1078
class _RemoteExportCb(ImportExportCbBase):
1079
  def __init__(self, feedback_fn, disk_count):
1080
    """Initializes this class.
1081

1082
    """
1083
    ImportExportCbBase.__init__(self)
1084
    self._feedback_fn = feedback_fn
1085
    self._dresults = [None] * disk_count
1086

    
1087
  @property
1088
  def disk_results(self):
1089
    """Returns per-disk results.
1090

1091
    """
1092
    return self._dresults
1093

    
1094
  def ReportConnected(self, ie, private):
1095
    """Called when a connection has been established.
1096

1097
    """
1098
    (idx, _) = private
1099

    
1100
    self._feedback_fn("Disk %s is now sending data" % idx)
1101

    
1102
  def ReportProgress(self, ie, private):
1103
    """Called when new progress information should be reported.
1104

1105
    """
1106
    (idx, _) = private
1107

    
1108
    progress = ie.progress
1109
    if not progress:
1110
      return
1111

    
1112
    self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1113

    
1114
  def ReportFinished(self, ie, private):
1115
    """Called when a transfer has finished.
1116

1117
    """
1118
    (idx, finished_fn) = private
1119

    
1120
    if ie.success:
1121
      self._feedback_fn("Disk %s finished sending data" % idx)
1122
    else:
1123
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" %
1124
                        (idx, ie.final_message, ie.recent_output))
1125

    
1126
    self._dresults[idx] = bool(ie.success)
1127

    
1128
    if finished_fn:
1129
      finished_fn()
1130

    
1131

    
1132
class ExportInstanceHelper:
1133
  def __init__(self, lu, feedback_fn, instance):
1134
    """Initializes this class.
1135

1136
    @param lu: Logical unit instance
1137
    @param feedback_fn: Feedback function
1138
    @type instance: L{objects.Instance}
1139
    @param instance: Instance object
1140

1141
    """
1142
    self._lu = lu
1143
    self._feedback_fn = feedback_fn
1144
    self._instance = instance
1145

    
1146
    self._snap_disks = []
1147
    self._removed_snaps = [False] * len(instance.disks)
1148

    
1149
  def CreateSnapshots(self):
1150
    """Creates an LVM snapshot for every disk of the instance.
1151

1152
    """
1153
    assert not self._snap_disks
1154

    
1155
    instance = self._instance
1156
    src_node = instance.primary_node
1157

    
1158
    vgname = self._lu.cfg.GetVGName()
1159

    
1160
    for idx, disk in enumerate(instance.disks):
1161
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1162
                        (idx, src_node))
1163

    
1164
      # result.payload will be a snapshot of an lvm leaf of the one we
1165
      # passed
1166
      result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1167
      msg = result.fail_msg
1168
      if msg:
1169
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1170
                            idx, src_node, msg)
1171
        new_dev = False
1172
      else:
1173
        disk_id = (vgname, result.payload)
1174
        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1175
                               logical_id=disk_id, physical_id=disk_id,
1176
                               iv_name=disk.iv_name)
1177

    
1178
      self._snap_disks.append(new_dev)
1179

    
1180
    assert len(self._snap_disks) == len(instance.disks)
1181
    assert len(self._removed_snaps) == len(instance.disks)
1182

    
1183
  def _RemoveSnapshot(self, disk_index):
1184
    """Removes an LVM snapshot.
1185

1186
    @type disk_index: number
1187
    @param disk_index: Index of the snapshot to be removed
1188

1189
    """
1190
    disk = self._snap_disks[disk_index]
1191
    if disk and not self._removed_snaps[disk_index]:
1192
      src_node = self._instance.primary_node
1193

    
1194
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1195
                        (disk_index, src_node))
1196

    
1197
      result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1198
      if result.fail_msg:
1199
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1200
                            " %s: %s", disk_index, src_node, result.fail_msg)
1201
      else:
1202
        self._removed_snaps[disk_index] = True
1203

    
1204
  def LocalExport(self, dest_node):
1205
    """Intra-cluster instance export.
1206

1207
    @type dest_node: L{objects.Node}
1208
    @param dest_node: Destination node
1209

1210
    """
1211
    instance = self._instance
1212
    src_node = instance.primary_node
1213

    
1214
    assert len(self._snap_disks) == len(instance.disks)
1215

    
1216
    transfers = []
1217

    
1218
    for idx, dev in enumerate(self._snap_disks):
1219
      if not dev:
1220
        transfers.append(None)
1221
        continue
1222

    
1223
      path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1224
                            dev.physical_id[1])
1225

    
1226
      finished_fn = compat.partial(self._TransferFinished, idx)
1227

    
1228
      # FIXME: pass debug option from opcode to backend
1229
      dt = DiskTransfer("snapshot/%s" % idx,
1230
                        constants.IEIO_SCRIPT, (dev, idx),
1231
                        constants.IEIO_FILE, (path, ),
1232
                        finished_fn)
1233
      transfers.append(dt)
1234

    
1235
    # Actually export data
1236
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1237
                                    src_node, dest_node.name,
1238
                                    dest_node.secondary_ip,
1239
                                    instance, transfers)
1240

    
1241
    assert len(dresults) == len(instance.disks)
1242

    
1243
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1244
    result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1245
                                               self._snap_disks)
1246
    msg = result.fail_msg
1247
    fin_resu = not msg
1248
    if msg:
1249
      self._lu.LogWarning("Could not finalize export for instance %s"
1250
                          " on node %s: %s", instance.name, dest_node.name, msg)
1251

    
1252
    return (fin_resu, dresults)
1253

    
1254
  def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1255
    """Inter-cluster instance export.
1256

1257
    @type disk_info: list
1258
    @param disk_info: Per-disk destination information
1259
    @type key_name: string
1260
    @param key_name: Name of X509 key to use
1261
    @type dest_ca_pem: string
1262
    @param dest_ca_pem: Destination X509 CA in PEM format
1263
    @type timeouts: L{ImportExportTimeouts}
1264
    @param timeouts: Timeouts for this import
1265

1266
    """
1267
    instance = self._instance
1268

    
1269
    assert len(disk_info) == len(instance.disks)
1270

    
1271
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1272

    
1273
    ieloop = ImportExportLoop(self._lu)
1274
    try:
1275
      for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1276
                                                           disk_info)):
1277
        opts = objects.ImportExportOptions(key_name=key_name,
1278
                                           ca_pem=dest_ca_pem,
1279
                                           magic=magic)
1280

    
1281
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1282
        finished_fn = compat.partial(self._TransferFinished, idx)
1283
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1284
                              opts, host, port, instance,
1285
                              constants.IEIO_SCRIPT, (dev, idx),
1286
                              timeouts, cbs, private=(idx, finished_fn)))
1287

    
1288
      ieloop.Run()
1289
    finally:
1290
      ieloop.FinalizeAll()
1291

    
1292
    return (True, cbs.disk_results)
1293

    
1294
  def _TransferFinished(self, idx):
1295
    """Called once a transfer has finished.
1296

1297
    @type idx: number
1298
    @param idx: Disk index
1299

1300
    """
1301
    logging.debug("Transfer %s finished", idx)
1302
    self._RemoveSnapshot(idx)
1303

    
1304
  def Cleanup(self):
1305
    """Remove all snapshots.
1306

1307
    """
1308
    assert len(self._removed_snaps) == len(self._instance.disks)
1309
    for idx in range(len(self._instance.disks)):
1310
      self._RemoveSnapshot(idx)
1311

    
1312

    
1313
class _RemoteImportCb(ImportExportCbBase):
1314
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1315
               external_address):
1316
    """Initializes this class.
1317

1318
    @type cds: string
1319
    @param cds: Cluster domain secret
1320
    @type x509_cert_pem: string
1321
    @param x509_cert_pem: CA used for signing import key
1322
    @type disk_count: number
1323
    @param disk_count: Number of disks
1324
    @type external_address: string
1325
    @param external_address: External address of destination node
1326

1327
    """
1328
    ImportExportCbBase.__init__(self)
1329
    self._feedback_fn = feedback_fn
1330
    self._cds = cds
1331
    self._x509_cert_pem = x509_cert_pem
1332
    self._disk_count = disk_count
1333
    self._external_address = external_address
1334

    
1335
    self._dresults = [None] * disk_count
1336
    self._daemon_port = [None] * disk_count
1337

    
1338
    self._salt = utils.GenerateSecret(8)
1339

    
1340
  @property
1341
  def disk_results(self):
1342
    """Returns per-disk results.
1343

1344
    """
1345
    return self._dresults
1346

    
1347
  def _CheckAllListening(self):
1348
    """Checks whether all daemons are listening.
1349

1350
    If all daemons are listening, the information is sent to the client.
1351

1352
    """
1353
    if not compat.all(dp is not None for dp in self._daemon_port):
1354
      return
1355

    
1356
    host = self._external_address
1357

    
1358
    disks = []
1359
    for idx, (port, magic) in enumerate(self._daemon_port):
1360
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1361
                                               idx, host, port, magic))
1362

    
1363
    assert len(disks) == self._disk_count
1364

    
1365
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1366
      "disks": disks,
1367
      "x509_ca": self._x509_cert_pem,
1368
      })
1369

    
1370
  def ReportListening(self, ie, private):
1371
    """Called when daemon started listening.
1372

1373
    """
1374
    (idx, ) = private
1375

    
1376
    self._feedback_fn("Disk %s is now listening" % idx)
1377

    
1378
    assert self._daemon_port[idx] is None
1379

    
1380
    self._daemon_port[idx] = (ie.listen_port, ie.magic)
1381

    
1382
    self._CheckAllListening()
1383

    
1384
  def ReportConnected(self, ie, private):
1385
    """Called when a connection has been established.
1386

1387
    """
1388
    (idx, ) = private
1389

    
1390
    self._feedback_fn("Disk %s is now receiving data" % idx)
1391

    
1392
  def ReportFinished(self, ie, private):
1393
    """Called when a transfer has finished.
1394

1395
    """
1396
    (idx, ) = private
1397

    
1398
    # Daemon is certainly no longer listening
1399
    self._daemon_port[idx] = None
1400

    
1401
    if ie.success:
1402
      self._feedback_fn("Disk %s finished receiving data" % idx)
1403
    else:
1404
      self._feedback_fn(("Disk %s failed to receive data: %s"
1405
                         " (recent output: %r)") %
1406
                        (idx, ie.final_message, ie.recent_output))
1407

    
1408
    self._dresults[idx] = bool(ie.success)
1409

    
1410

    
1411
def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts):
1412
  """Imports an instance from another cluster.
1413

1414
  @param lu: Logical unit instance
1415
  @param feedback_fn: Feedback function
1416
  @type instance: L{objects.Instance}
1417
  @param instance: Instance object
1418
  @type source_x509_ca: OpenSSL.crypto.X509
1419
  @param source_x509_ca: Import source's X509 CA
1420
  @type cds: string
1421
  @param cds: Cluster domain secret
1422
  @type timeouts: L{ImportExportTimeouts}
1423
  @param timeouts: Timeouts for this import
1424

1425
  """
1426
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1427
                                                  source_x509_ca)
1428

    
1429
  magic_base = utils.GenerateSecret(6)
1430

    
1431
  # Create crypto key
1432
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1433
                                        constants.RIE_CERT_VALIDITY)
1434
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1435

    
1436
  (x509_key_name, x509_cert_pem) = result.payload
1437
  try:
1438
    # Load certificate
1439
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1440
                                                x509_cert_pem)
1441

    
1442
    # Sign certificate
1443
    signed_x509_cert_pem = \
1444
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1445

    
1446
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1447
                          len(instance.disks), instance.primary_node)
1448

    
1449
    ieloop = ImportExportLoop(lu)
1450
    try:
1451
      for idx, dev in enumerate(instance.disks):
1452
        magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1453

    
1454
        # Import daemon options
1455
        opts = objects.ImportExportOptions(key_name=x509_key_name,
1456
                                           ca_pem=source_ca_pem,
1457
                                           magic=magic)
1458

    
1459
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1460
                              constants.IEIO_SCRIPT, (dev, idx),
1461
                              timeouts, cbs, private=(idx, )))
1462

    
1463
      ieloop.Run()
1464
    finally:
1465
      ieloop.FinalizeAll()
1466
  finally:
1467
    # Remove crypto key and certificate
1468
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1469
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1470

    
1471
  return cbs.disk_results
1472

    
1473

    
1474
def _GetImportExportHandshakeMessage(version):
1475
  """Returns the handshake message for a RIE protocol version.
1476

1477
  @type version: number
1478

1479
  """
1480
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1481

    
1482

    
1483
def ComputeRemoteExportHandshake(cds):
1484
  """Computes the remote import/export handshake.
1485

1486
  @type cds: string
1487
  @param cds: Cluster domain secret
1488

1489
  """
1490
  salt = utils.GenerateSecret(8)
1491
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1492
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1493

    
1494

    
1495
def CheckRemoteExportHandshake(cds, handshake):
1496
  """Checks the handshake of a remote import/export.
1497

1498
  @type cds: string
1499
  @param cds: Cluster domain secret
1500
  @type handshake: sequence
1501
  @param handshake: Handshake sent by remote peer
1502

1503
  """
1504
  try:
1505
    (version, hmac_digest, hmac_salt) = handshake
1506
  except (TypeError, ValueError), err:
1507
    return "Invalid data: %s" % err
1508

    
1509
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1510
                              hmac_digest, salt=hmac_salt):
1511
    return "Hash didn't match, clusters don't share the same domain secret"
1512

    
1513
  if version != constants.RIE_VERSION:
1514
    return ("Clusters don't have the same remote import/export protocol"
1515
            " (local=%s, remote=%s)" %
1516
            (constants.RIE_VERSION, version))
1517

    
1518
  return None
1519

    
1520

    
1521
def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1522
  """Returns the hashed text for import/export disk information.
1523

1524
  @type disk_index: number
1525
  @param disk_index: Index of disk (included in hash)
1526
  @type host: string
1527
  @param host: Hostname
1528
  @type port: number
1529
  @param port: Daemon port
1530
  @type magic: string
1531
  @param magic: Magic value
1532

1533
  """
1534
  return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1535

    
1536

    
1537
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1538
  """Verifies received disk information for an export.
1539

1540
  @type cds: string
1541
  @param cds: Cluster domain secret
1542
  @type disk_index: number
1543
  @param disk_index: Index of disk (included in hash)
1544
  @type disk_info: sequence
1545
  @param disk_info: Disk information sent by remote peer
1546

1547
  """
1548
  try:
1549
    (host, port, magic, hmac_digest, hmac_salt) = disk_info
1550
  except (TypeError, ValueError), err:
1551
    raise errors.GenericError("Invalid data: %s" % err)
1552

    
1553
  if not (host and port and magic):
1554
    raise errors.GenericError("Missing destination host, port or magic")
1555

    
1556
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1557

    
1558
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1559
    raise errors.GenericError("HMAC is wrong")
1560

    
1561
  return (netutils.Hostname.GetNormalizedName(host),
1562
          utils.ValidateServiceName(port),
1563
          magic)
1564

    
1565

    
1566
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1567
  """Computes the signed disk information for a remote import.
1568

1569
  @type cds: string
1570
  @param cds: Cluster domain secret
1571
  @type salt: string
1572
  @param salt: HMAC salt
1573
  @type disk_index: number
1574
  @param disk_index: Index of disk (included in hash)
1575
  @type host: string
1576
  @param host: Hostname
1577
  @type port: number
1578
  @param port: Daemon port
1579
  @type magic: string
1580
  @param magic: Magic value
1581

1582
  """
1583
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1584
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1585
  return (host, port, magic, hmac_digest, salt)