Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ 194e8648

History | View | Annotate | Download (44.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Instance-related functions and classes for masterd.
23

24
"""
25

    
26
import logging
27
import time
28
import OpenSSL
29

    
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import compat
33
from ganeti import utils
34
from ganeti import objects
35
from ganeti import netutils
36

    
37

    
38
class _ImportExportError(Exception):
39
  """Local exception to report import/export errors.
40

41
  """
42

    
43

    
44
class ImportExportTimeouts(object):
45
  #: Time until daemon starts writing status file
46
  DEFAULT_READY_TIMEOUT = 10
47

    
48
  #: Length of time until errors cause hard failure
49
  DEFAULT_ERROR_TIMEOUT = 10
50

    
51
  #: Time after which daemon must be listening
52
  DEFAULT_LISTEN_TIMEOUT = 10
53

    
54
  #: Progress update interval
55
  DEFAULT_PROGRESS_INTERVAL = 60
56

    
57
  __slots__ = [
58
    "error",
59
    "ready",
60
    "listen",
61
    "connect",
62
    "progress",
63
    ]
64

    
65
  def __init__(self, connect,
66
               listen=DEFAULT_LISTEN_TIMEOUT,
67
               error=DEFAULT_ERROR_TIMEOUT,
68
               ready=DEFAULT_READY_TIMEOUT,
69
               progress=DEFAULT_PROGRESS_INTERVAL):
70
    """Initializes this class.
71

72
    @type connect: number
73
    @param connect: Timeout for establishing connection
74
    @type listen: number
75
    @param listen: Timeout for starting to listen for connections
76
    @type error: number
77
    @param error: Length of time until errors cause hard failure
78
    @type ready: number
79
    @param ready: Timeout for daemon to become ready
80
    @type progress: number
81
    @param progress: Progress update interval
82

83
    """
84
    self.error = error
85
    self.ready = ready
86
    self.listen = listen
87
    self.connect = connect
88
    self.progress = progress
89

    
90

    
91
class ImportExportCbBase(object):
92
  """Callbacks for disk import/export.
93

94
  """
95
  def ReportListening(self, ie, private):
96
    """Called when daemon started listening.
97

98
    @type ie: Subclass of L{_DiskImportExportBase}
99
    @param ie: Import/export object
100
    @param private: Private data passed to import/export object
101

102
    """
103

    
104
  def ReportConnected(self, ie, private):
105
    """Called when a connection has been established.
106

107
    @type ie: Subclass of L{_DiskImportExportBase}
108
    @param ie: Import/export object
109
    @param private: Private data passed to import/export object
110

111
    """
112

    
113
  def ReportProgress(self, ie, private):
114
    """Called when new progress information should be reported.
115

116
    @type ie: Subclass of L{_DiskImportExportBase}
117
    @param ie: Import/export object
118
    @param private: Private data passed to import/export object
119

120
    """
121

    
122
  def ReportFinished(self, ie, private):
123
    """Called when a transfer has finished.
124

125
    @type ie: Subclass of L{_DiskImportExportBase}
126
    @param ie: Import/export object
127
    @param private: Private data passed to import/export object
128

129
    """
130

    
131

    
132
def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
133
  """Checks whether a timeout has expired.
134

135
  """
136
  return _time_fn() > (epoch + timeout)
137

    
138

    
139
class _DiskImportExportBase(object):
140
  MODE_TEXT = None
141

    
142
  def __init__(self, lu, node_name, opts,
143
               instance, timeouts, cbs, private=None):
144
    """Initializes this class.
145

146
    @param lu: Logical unit instance
147
    @type node_name: string
148
    @param node_name: Node name for import
149
    @type opts: L{objects.ImportExportOptions}
150
    @param opts: Import/export daemon options
151
    @type instance: L{objects.Instance}
152
    @param instance: Instance object
153
    @type timeouts: L{ImportExportTimeouts}
154
    @param timeouts: Timeouts for this import
155
    @type cbs: L{ImportExportCbBase}
156
    @param cbs: Callbacks
157
    @param private: Private data for callback functions
158

159
    """
160
    assert self.MODE_TEXT
161

    
162
    self._lu = lu
163
    self.node_name = node_name
164
    self._opts = opts.Copy()
165
    self._instance = instance
166
    self._timeouts = timeouts
167
    self._cbs = cbs
168
    self._private = private
169

    
170
    # Set master daemon's timeout in options for import/export daemon
171
    assert self._opts.connect_timeout is None
172
    self._opts.connect_timeout = timeouts.connect
173

    
174
    # Parent loop
175
    self._loop = None
176

    
177
    # Timestamps
178
    self._ts_begin = None
179
    self._ts_connected = None
180
    self._ts_finished = None
181
    self._ts_cleanup = None
182
    self._ts_last_progress = None
183
    self._ts_last_error = None
184

    
185
    # Transfer status
186
    self.success = None
187
    self.final_message = None
188

    
189
    # Daemon status
190
    self._daemon_name = None
191
    self._daemon = None
192

    
193
  @property
194
  def recent_output(self):
195
    """Returns the most recent output from the daemon.
196

197
    """
198
    if self._daemon:
199
      return "\n".join(self._daemon.recent_output)
200

    
201
    return None
202

    
203
  @property
204
  def progress(self):
205
    """Returns transfer progress information.
206

207
    """
208
    if not self._daemon:
209
      return None
210

    
211
    return (self._daemon.progress_mbytes,
212
            self._daemon.progress_throughput,
213
            self._daemon.progress_percent,
214
            self._daemon.progress_eta)
215

    
216
  @property
217
  def magic(self):
218
    """Returns the magic value for this import/export.
219

220
    """
221
    return self._opts.magic
222

    
223
  @property
224
  def active(self):
225
    """Determines whether this transport is still active.
226

227
    """
228
    return self.success is None
229

    
230
  @property
231
  def loop(self):
232
    """Returns parent loop.
233

234
    @rtype: L{ImportExportLoop}
235

236
    """
237
    return self._loop
238

    
239
  def SetLoop(self, loop):
240
    """Sets the parent loop.
241

242
    @type loop: L{ImportExportLoop}
243

244
    """
245
    if self._loop:
246
      raise errors.ProgrammerError("Loop can only be set once")
247

    
248
    self._loop = loop
249

    
250
  def _StartDaemon(self):
251
    """Starts the import/export daemon.
252

253
    """
254
    raise NotImplementedError()
255

    
256
  def CheckDaemon(self):
257
    """Checks whether daemon has been started and if not, starts it.
258

259
    @rtype: string
260
    @return: Daemon name
261

262
    """
263
    assert self._ts_cleanup is None
264

    
265
    if self._daemon_name is None:
266
      assert self._ts_begin is None
267

    
268
      result = self._StartDaemon()
269
      if result.fail_msg:
270
        raise _ImportExportError("Failed to start %s on %s: %s" %
271
                                 (self.MODE_TEXT, self.node_name,
272
                                  result.fail_msg))
273

    
274
      daemon_name = result.payload
275

    
276
      logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
277
                   self.node_name)
278

    
279
      self._ts_begin = time.time()
280
      self._daemon_name = daemon_name
281

    
282
    return self._daemon_name
283

    
284
  def GetDaemonName(self):
285
    """Returns the daemon name.
286

287
    """
288
    assert self._daemon_name, "Daemon has not been started"
289
    assert self._ts_cleanup is None
290
    return self._daemon_name
291

    
292
  def Abort(self):
293
    """Sends SIGTERM to import/export daemon (if still active).
294

295
    """
296
    if self._daemon_name:
297
      self._lu.LogWarning("Aborting %s '%s' on %s",
298
                          self.MODE_TEXT, self._daemon_name, self.node_name)
299
      result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
300
      if result.fail_msg:
301
        self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
302
                            self.MODE_TEXT, self._daemon_name,
303
                            self.node_name, result.fail_msg)
304
        return False
305

    
306
    return True
307

    
308
  def _SetDaemonData(self, data):
309
    """Internal function for updating status daemon data.
310

311
    @type data: L{objects.ImportExportStatus}
312
    @param data: Daemon status data
313

314
    """
315
    assert self._ts_begin is not None
316

    
317
    if not data:
318
      if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
319
        raise _ImportExportError("Didn't become ready after %s seconds" %
320
                                 self._timeouts.ready)
321

    
322
      return False
323

    
324
    self._daemon = data
325

    
326
    return True
327

    
328
  def SetDaemonData(self, success, data):
329
    """Updates daemon status data.
330

331
    @type success: bool
332
    @param success: Whether fetching data was successful or not
333
    @type data: L{objects.ImportExportStatus}
334
    @param data: Daemon status data
335

336
    """
337
    if not success:
338
      if self._ts_last_error is None:
339
        self._ts_last_error = time.time()
340

    
341
      elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
342
        raise _ImportExportError("Too many errors while updating data")
343

    
344
      return False
345

    
346
    self._ts_last_error = None
347

    
348
    return self._SetDaemonData(data)
349

    
350
  def CheckListening(self):
351
    """Checks whether the daemon is listening.
352

353
    """
354
    raise NotImplementedError()
355

    
356
  def _GetConnectedCheckEpoch(self):
357
    """Returns timeout to calculate connect timeout.
358

359
    """
360
    raise NotImplementedError()
361

    
362
  def CheckConnected(self):
363
    """Checks whether the daemon is connected.
364

365
    @rtype: bool
366
    @return: Whether the daemon is connected
367

368
    """
369
    assert self._daemon, "Daemon status missing"
370

    
371
    if self._ts_connected is not None:
372
      return True
373

    
374
    if self._daemon.connected:
375
      self._ts_connected = time.time()
376

    
377
      # TODO: Log remote peer
378
      logging.debug("%s '%s' on %s is now connected",
379
                    self.MODE_TEXT, self._daemon_name, self.node_name)
380

    
381
      self._cbs.ReportConnected(self, self._private)
382

    
383
      return True
384

    
385
    if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
386
      raise _ImportExportError("Not connected after %s seconds" %
387
                               self._timeouts.connect)
388

    
389
    return False
390

    
391
  def _CheckProgress(self):
392
    """Checks whether a progress update should be reported.
393

394
    """
395
    if ((self._ts_last_progress is None or
396
         _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
397
        self._daemon and
398
        self._daemon.progress_mbytes is not None and
399
        self._daemon.progress_throughput is not None):
400
      self._cbs.ReportProgress(self, self._private)
401
      self._ts_last_progress = time.time()
402

    
403
  def CheckFinished(self):
404
    """Checks whether the daemon exited.
405

406
    @rtype: bool
407
    @return: Whether the transfer is finished
408

409
    """
410
    assert self._daemon, "Daemon status missing"
411

    
412
    if self._ts_finished:
413
      return True
414

    
415
    if self._daemon.exit_status is None:
416
      # TODO: Adjust delay for ETA expiring soon
417
      self._CheckProgress()
418
      return False
419

    
420
    self._ts_finished = time.time()
421

    
422
    self._ReportFinished(self._daemon.exit_status == 0,
423
                         self._daemon.error_message)
424

    
425
    return True
426

    
427
  def _ReportFinished(self, success, message):
428
    """Transfer is finished or daemon exited.
429

430
    @type success: bool
431
    @param success: Whether the transfer was successful
432
    @type message: string
433
    @param message: Error message
434

435
    """
436
    assert self.success is None
437

    
438
    self.success = success
439
    self.final_message = message
440

    
441
    if success:
442
      logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
443
                   self._daemon_name, self.node_name)
444
    elif self._daemon_name:
445
      self._lu.LogWarning("%s '%s' on %s failed: %s",
446
                          self.MODE_TEXT, self._daemon_name, self.node_name,
447
                          message)
448
    else:
449
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
450
                          self.node_name, message)
451

    
452
    self._cbs.ReportFinished(self, self._private)
453

    
454
  def _Finalize(self):
455
    """Makes the RPC call to finalize this import/export.
456

457
    """
458
    return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
459

    
460
  def Finalize(self, error=None):
461
    """Finalizes this import/export.
462

463
    """
464
    if self._daemon_name:
465
      logging.info("Finalizing %s '%s' on %s",
466
                   self.MODE_TEXT, self._daemon_name, self.node_name)
467

    
468
      result = self._Finalize()
469
      if result.fail_msg:
470
        self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
471
                            self.MODE_TEXT, self._daemon_name,
472
                            self.node_name, result.fail_msg)
473
        return False
474

    
475
      # Daemon is no longer running
476
      self._daemon_name = None
477
      self._ts_cleanup = time.time()
478

    
479
    if error:
480
      self._ReportFinished(False, error)
481

    
482
    return True
483

    
484

    
485
class DiskImport(_DiskImportExportBase):
486
  MODE_TEXT = "import"
487

    
488
  def __init__(self, lu, node_name, opts, instance,
489
               dest, dest_args, timeouts, cbs, private=None):
490
    """Initializes this class.
491

492
    @param lu: Logical unit instance
493
    @type node_name: string
494
    @param node_name: Node name for import
495
    @type opts: L{objects.ImportExportOptions}
496
    @param opts: Import/export daemon options
497
    @type instance: L{objects.Instance}
498
    @param instance: Instance object
499
    @param dest: I/O destination
500
    @param dest_args: I/O arguments
501
    @type timeouts: L{ImportExportTimeouts}
502
    @param timeouts: Timeouts for this import
503
    @type cbs: L{ImportExportCbBase}
504
    @param cbs: Callbacks
505
    @param private: Private data for callback functions
506

507
    """
508
    _DiskImportExportBase.__init__(self, lu, node_name, opts,
509
                                   instance, timeouts, cbs, private)
510
    self._dest = dest
511
    self._dest_args = dest_args
512

    
513
    # Timestamps
514
    self._ts_listening = None
515

    
516
  @property
517
  def listen_port(self):
518
    """Returns the port the daemon is listening on.
519

520
    """
521
    if self._daemon:
522
      return self._daemon.listen_port
523

    
524
    return None
525

    
526
  def _StartDaemon(self):
527
    """Starts the import daemon.
528

529
    """
530
    return self._lu.rpc.call_import_start(self.node_name, self._opts,
531
                                          self._instance,
532
                                          self._dest, self._dest_args)
533

    
534
  def CheckListening(self):
535
    """Checks whether the daemon is listening.
536

537
    @rtype: bool
538
    @return: Whether the daemon is listening
539

540
    """
541
    assert self._daemon, "Daemon status missing"
542

    
543
    if self._ts_listening is not None:
544
      return True
545

    
546
    port = self._daemon.listen_port
547
    if port is not None:
548
      self._ts_listening = time.time()
549

    
550
      logging.debug("Import '%s' on %s is now listening on port %s",
551
                    self._daemon_name, self.node_name, port)
552

    
553
      self._cbs.ReportListening(self, self._private)
554

    
555
      return True
556

    
557
    if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
558
      raise _ImportExportError("Not listening after %s seconds" %
559
                               self._timeouts.listen)
560

    
561
    return False
562

    
563
  def _GetConnectedCheckEpoch(self):
564
    """Returns the time since we started listening.
565

566
    """
567
    assert self._ts_listening is not None, \
568
           ("Checking whether an import is connected is only useful"
569
            " once it's been listening")
570

    
571
    return self._ts_listening
572

    
573

    
574
class DiskExport(_DiskImportExportBase):
575
  MODE_TEXT = "export"
576

    
577
  def __init__(self, lu, node_name, opts,
578
               dest_host, dest_port, instance, source, source_args,
579
               timeouts, cbs, private=None):
580
    """Initializes this class.
581

582
    @param lu: Logical unit instance
583
    @type node_name: string
584
    @param node_name: Node name for import
585
    @type opts: L{objects.ImportExportOptions}
586
    @param opts: Import/export daemon options
587
    @type dest_host: string
588
    @param dest_host: Destination host name or IP address
589
    @type dest_port: number
590
    @param dest_port: Destination port number
591
    @type instance: L{objects.Instance}
592
    @param instance: Instance object
593
    @param source: I/O source
594
    @param source_args: I/O source
595
    @type timeouts: L{ImportExportTimeouts}
596
    @param timeouts: Timeouts for this import
597
    @type cbs: L{ImportExportCbBase}
598
    @param cbs: Callbacks
599
    @param private: Private data for callback functions
600

601
    """
602
    _DiskImportExportBase.__init__(self, lu, node_name, opts,
603
                                   instance, timeouts, cbs, private)
604
    self._dest_host = dest_host
605
    self._dest_port = dest_port
606
    self._source = source
607
    self._source_args = source_args
608

    
609
  def _StartDaemon(self):
610
    """Starts the export daemon.
611

612
    """
613
    return self._lu.rpc.call_export_start(self.node_name, self._opts,
614
                                          self._dest_host, self._dest_port,
615
                                          self._instance, self._source,
616
                                          self._source_args)
617

    
618
  def CheckListening(self):
619
    """Checks whether the daemon is listening.
620

621
    """
622
    # Only an import can be listening
623
    return True
624

    
625
  def _GetConnectedCheckEpoch(self):
626
    """Returns the time since the daemon started.
627

628
    """
629
    assert self._ts_begin is not None
630

    
631
    return self._ts_begin
632

    
633

    
634
def FormatProgress(progress):
635
  """Formats progress information for user consumption
636

637
  """
638
  (mbytes, throughput, percent, eta) = progress
639

    
640
  parts = [
641
    utils.FormatUnit(mbytes, "h"),
642

    
643
    # Not using FormatUnit as it doesn't support kilobytes
644
    "%0.1f MiB/s" % throughput,
645
    ]
646

    
647
  if percent is not None:
648
    parts.append("%d%%" % percent)
649

    
650
  if eta is not None:
651
    parts.append("ETA %s" % utils.FormatSeconds(eta))
652

    
653
  return utils.CommaJoin(parts)
654

    
655

    
656
class ImportExportLoop:
657
  MIN_DELAY = 1.0
658
  MAX_DELAY = 20.0
659

    
660
  def __init__(self, lu):
661
    """Initializes this class.
662

663
    """
664
    self._lu = lu
665
    self._queue = []
666
    self._pending_add = []
667

    
668
  def Add(self, diskie):
669
    """Adds an import/export object to the loop.
670

671
    @type diskie: Subclass of L{_DiskImportExportBase}
672
    @param diskie: Import/export object
673

674
    """
675
    assert diskie not in self._pending_add
676
    assert diskie.loop is None
677

    
678
    diskie.SetLoop(self)
679

    
680
    # Adding new objects to a staging list is necessary, otherwise the main
681
    # loop gets confused if callbacks modify the queue while the main loop is
682
    # iterating over it.
683
    self._pending_add.append(diskie)
684

    
685
  @staticmethod
686
  def _CollectDaemonStatus(lu, daemons):
687
    """Collects the status for all import/export daemons.
688

689
    """
690
    daemon_status = {}
691

    
692
    for node_name, names in daemons.iteritems():
693
      result = lu.rpc.call_impexp_status(node_name, names)
694
      if result.fail_msg:
695
        lu.LogWarning("Failed to get daemon status on %s: %s",
696
                      node_name, result.fail_msg)
697
        continue
698

    
699
      assert len(names) == len(result.payload)
700

    
701
      daemon_status[node_name] = dict(zip(names, result.payload))
702

    
703
    return daemon_status
704

    
705
  @staticmethod
706
  def _GetActiveDaemonNames(queue):
707
    """Gets the names of all active daemons.
708

709
    """
710
    result = {}
711
    for diskie in queue:
712
      if not diskie.active:
713
        continue
714

    
715
      try:
716
        # Start daemon if necessary
717
        daemon_name = diskie.CheckDaemon()
718
      except _ImportExportError, err:
719
        logging.exception("%s failed", diskie.MODE_TEXT)
720
        diskie.Finalize(error=str(err))
721
        continue
722

    
723
      result.setdefault(diskie.node_name, []).append(daemon_name)
724

    
725
    assert len(queue) >= len(result)
726
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
727

    
728
    logging.debug("daemons=%r", result)
729

    
730
    return result
731

    
732
  def _AddPendingToQueue(self):
733
    """Adds all pending import/export objects to the internal queue.
734

735
    """
736
    assert compat.all(diskie not in self._queue and diskie.loop == self
737
                      for diskie in self._pending_add)
738

    
739
    self._queue.extend(self._pending_add)
740

    
741
    del self._pending_add[:]
742

    
743
  def Run(self):
744
    """Utility main loop.
745

746
    """
747
    while True:
748
      self._AddPendingToQueue()
749

    
750
      # Collect all active daemon names
751
      daemons = self._GetActiveDaemonNames(self._queue)
752
      if not daemons:
753
        break
754

    
755
      # Collection daemon status data
756
      data = self._CollectDaemonStatus(self._lu, daemons)
757

    
758
      # Use data
759
      delay = self.MAX_DELAY
760
      for diskie in self._queue:
761
        if not diskie.active:
762
          continue
763

    
764
        try:
765
          try:
766
            all_daemon_data = data[diskie.node_name]
767
          except KeyError:
768
            result = diskie.SetDaemonData(False, None)
769
          else:
770
            result = \
771
              diskie.SetDaemonData(True,
772
                                   all_daemon_data[diskie.GetDaemonName()])
773

    
774
          if not result:
775
            # Daemon not yet ready, retry soon
776
            delay = min(3.0, delay)
777
            continue
778

    
779
          if diskie.CheckFinished():
780
            # Transfer finished
781
            diskie.Finalize()
782
            continue
783

    
784
          # Normal case: check again in 5 seconds
785
          delay = min(5.0, delay)
786

    
787
          if not diskie.CheckListening():
788
            # Not yet listening, retry soon
789
            delay = min(1.0, delay)
790
            continue
791

    
792
          if not diskie.CheckConnected():
793
            # Not yet connected, retry soon
794
            delay = min(1.0, delay)
795
            continue
796

    
797
        except _ImportExportError, err:
798
          logging.exception("%s failed", diskie.MODE_TEXT)
799
          diskie.Finalize(error=str(err))
800

    
801
      if not compat.any(diskie.active for diskie in self._queue):
802
        break
803

    
804
      # Wait a bit
805
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
806
      logging.debug("Waiting for %ss", delay)
807
      time.sleep(delay)
808

    
809
  def FinalizeAll(self):
810
    """Finalizes all pending transfers.
811

812
    """
813
    success = True
814

    
815
    for diskie in self._queue:
816
      success = diskie.Finalize() and success
817

    
818
    return success
819

    
820

    
821
class _TransferInstCbBase(ImportExportCbBase):
822
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
823
               dest_node, dest_ip):
824
    """Initializes this class.
825

826
    """
827
    ImportExportCbBase.__init__(self)
828

    
829
    self.lu = lu
830
    self.feedback_fn = feedback_fn
831
    self.instance = instance
832
    self.timeouts = timeouts
833
    self.src_node = src_node
834
    self.src_cbs = src_cbs
835
    self.dest_node = dest_node
836
    self.dest_ip = dest_ip
837

    
838

    
839
class _TransferInstSourceCb(_TransferInstCbBase):
840
  def ReportConnected(self, ie, dtp):
841
    """Called when a connection has been established.
842

843
    """
844
    assert self.src_cbs is None
845
    assert dtp.src_export == ie
846
    assert dtp.dest_import
847

    
848
    self.feedback_fn("%s is sending data on %s" %
849
                     (dtp.data.name, ie.node_name))
850

    
851
  def ReportProgress(self, ie, dtp):
852
    """Called when new progress information should be reported.
853

854
    """
855
    progress = ie.progress
856
    if not progress:
857
      return
858

    
859
    self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
860

    
861
  def ReportFinished(self, ie, dtp):
862
    """Called when a transfer has finished.
863

864
    """
865
    assert self.src_cbs is None
866
    assert dtp.src_export == ie
867
    assert dtp.dest_import
868

    
869
    if ie.success:
870
      self.feedback_fn("%s finished sending data" % dtp.data.name)
871
    else:
872
      self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
873
                       (dtp.data.name, ie.final_message, ie.recent_output))
874

    
875
    dtp.RecordResult(ie.success)
876

    
877
    cb = dtp.data.finished_fn
878
    if cb:
879
      cb()
880

    
881
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
882
    # give the daemon a moment to sort things out
883
    if dtp.dest_import and not ie.success:
884
      dtp.dest_import.Abort()
885

    
886

    
887
class _TransferInstDestCb(_TransferInstCbBase):
888
  def ReportListening(self, ie, dtp):
889
    """Called when daemon started listening.
890

891
    """
892
    assert self.src_cbs
893
    assert dtp.src_export is None
894
    assert dtp.dest_import
895
    assert dtp.export_opts
896

    
897
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
898

    
899
    # Start export on source node
900
    de = DiskExport(self.lu, self.src_node, dtp.export_opts,
901
                    self.dest_ip, ie.listen_port,
902
                    self.instance, dtp.data.src_io, dtp.data.src_ioargs,
903
                    self.timeouts, self.src_cbs, private=dtp)
904
    ie.loop.Add(de)
905

    
906
    dtp.src_export = de
907

    
908
  def ReportConnected(self, ie, dtp):
909
    """Called when a connection has been established.
910

911
    """
912
    self.feedback_fn("%s is receiving data on %s" %
913
                     (dtp.data.name, self.dest_node))
914

    
915
  def ReportFinished(self, ie, dtp):
916
    """Called when a transfer has finished.
917

918
    """
919
    if ie.success:
920
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
921
    else:
922
      self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
923
                       (dtp.data.name, ie.final_message, ie.recent_output))
924

    
925
    dtp.RecordResult(ie.success)
926

    
927
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
928
    # give the daemon a moment to sort things out
929
    if dtp.src_export and not ie.success:
930
      dtp.src_export.Abort()
931

    
932

    
933
class DiskTransfer(object):
934
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
935
               finished_fn):
936
    """Initializes this class.
937

938
    @type name: string
939
    @param name: User-visible name for this transfer (e.g. "disk/0")
940
    @param src_io: Source I/O type
941
    @param src_ioargs: Source I/O arguments
942
    @param dest_io: Destination I/O type
943
    @param dest_ioargs: Destination I/O arguments
944
    @type finished_fn: callable
945
    @param finished_fn: Function called once transfer has finished
946

947
    """
948
    self.name = name
949

    
950
    self.src_io = src_io
951
    self.src_ioargs = src_ioargs
952

    
953
    self.dest_io = dest_io
954
    self.dest_ioargs = dest_ioargs
955

    
956
    self.finished_fn = finished_fn
957

    
958

    
959
class _DiskTransferPrivate(object):
960
  def __init__(self, data, success, export_opts):
961
    """Initializes this class.
962

963
    @type data: L{DiskTransfer}
964
    @type success: bool
965

966
    """
967
    self.data = data
968
    self.success = success
969
    self.export_opts = export_opts
970

    
971
    self.src_export = None
972
    self.dest_import = None
973

    
974
  def RecordResult(self, success):
975
    """Updates the status.
976

977
    One failed part will cause the whole transfer to fail.
978

979
    """
980
    self.success = self.success and success
981

    
982

    
983
def _GetInstDiskMagic(base, instance_name, index):
984
  """Computes the magic value for a disk export or import.
985

986
  @type base: string
987
  @param base: Random seed value (can be the same for all disks of a transfer)
988
  @type instance_name: string
989
  @param instance_name: Name of instance
990
  @type index: number
991
  @param index: Disk index
992

993
  """
994
  h = compat.sha1_hash()
995
  h.update(str(constants.RIE_VERSION))
996
  h.update(base)
997
  h.update(instance_name)
998
  h.update(str(index))
999
  return h.hexdigest()
1000

    
1001

    
1002
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
1003
                         instance, all_transfers):
1004
  """Transfers an instance's data from one node to another.
1005

1006
  @param lu: Logical unit instance
1007
  @param feedback_fn: Feedback function
1008
  @type src_node: string
1009
  @param src_node: Source node name
1010
  @type dest_node: string
1011
  @param dest_node: Destination node name
1012
  @type dest_ip: string
1013
  @param dest_ip: IP address of destination node
1014
  @type instance: L{objects.Instance}
1015
  @param instance: Instance object
1016
  @type all_transfers: list of L{DiskTransfer} instances
1017
  @param all_transfers: List of all disk transfers to be made
1018
  @rtype: list
1019
  @return: List with a boolean (True=successful, False=failed) for success for
1020
           each transfer
1021

1022
  """
1023
  # Disable compression for all moves as these are all within the same cluster
1024
  compress = constants.IEC_NONE
1025

    
1026
  logging.debug("Source node %s, destination node %s, compression '%s'",
1027
                src_node, dest_node, compress)
1028

    
1029
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1030
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1031
                                  src_node, None, dest_node, dest_ip)
1032
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1033
                                 src_node, src_cbs, dest_node, dest_ip)
1034

    
1035
  all_dtp = []
1036

    
1037
  base_magic = utils.GenerateSecret(6)
1038

    
1039
  ieloop = ImportExportLoop(lu)
1040
  try:
1041
    for idx, transfer in enumerate(all_transfers):
1042
      if transfer:
1043
        feedback_fn("Exporting %s from %s to %s" %
1044
                    (transfer.name, src_node, dest_node))
1045

    
1046
        magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1047
        opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1048
                                           compress=compress, magic=magic)
1049

    
1050
        dtp = _DiskTransferPrivate(transfer, True, opts)
1051

    
1052
        di = DiskImport(lu, dest_node, opts, instance,
1053
                        transfer.dest_io, transfer.dest_ioargs,
1054
                        timeouts, dest_cbs, private=dtp)
1055
        ieloop.Add(di)
1056

    
1057
        dtp.dest_import = di
1058
      else:
1059
        dtp = _DiskTransferPrivate(None, False, None)
1060

    
1061
      all_dtp.append(dtp)
1062

    
1063
    ieloop.Run()
1064
  finally:
1065
    ieloop.FinalizeAll()
1066

    
1067
  assert len(all_dtp) == len(all_transfers)
1068
  assert compat.all((dtp.src_export is None or
1069
                      dtp.src_export.success is not None) and
1070
                     (dtp.dest_import is None or
1071
                      dtp.dest_import.success is not None)
1072
                     for dtp in all_dtp), \
1073
         "Not all imports/exports are finalized"
1074

    
1075
  return [bool(dtp.success) for dtp in all_dtp]
1076

    
1077

    
1078
class _RemoteExportCb(ImportExportCbBase):
1079
  def __init__(self, feedback_fn, disk_count):
1080
    """Initializes this class.
1081

1082
    """
1083
    ImportExportCbBase.__init__(self)
1084
    self._feedback_fn = feedback_fn
1085
    self._dresults = [None] * disk_count
1086

    
1087
  @property
1088
  def disk_results(self):
1089
    """Returns per-disk results.
1090

1091
    """
1092
    return self._dresults
1093

    
1094
  def ReportConnected(self, ie, private):
1095
    """Called when a connection has been established.
1096

1097
    """
1098
    (idx, _) = private
1099

    
1100
    self._feedback_fn("Disk %s is now sending data" % idx)
1101

    
1102
  def ReportProgress(self, ie, private):
1103
    """Called when new progress information should be reported.
1104

1105
    """
1106
    (idx, _) = private
1107

    
1108
    progress = ie.progress
1109
    if not progress:
1110
      return
1111

    
1112
    self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1113

    
1114
  def ReportFinished(self, ie, private):
1115
    """Called when a transfer has finished.
1116

1117
    """
1118
    (idx, finished_fn) = private
1119

    
1120
    if ie.success:
1121
      self._feedback_fn("Disk %s finished sending data" % idx)
1122
    else:
1123
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1124
                        (idx, ie.final_message, ie.recent_output))
1125

    
1126
    self._dresults[idx] = bool(ie.success)
1127

    
1128
    if finished_fn:
1129
      finished_fn()
1130

    
1131

    
1132
class ExportInstanceHelper:
1133
  def __init__(self, lu, feedback_fn, instance):
1134
    """Initializes this class.
1135

1136
    @param lu: Logical unit instance
1137
    @param feedback_fn: Feedback function
1138
    @type instance: L{objects.Instance}
1139
    @param instance: Instance object
1140

1141
    """
1142
    self._lu = lu
1143
    self._feedback_fn = feedback_fn
1144
    self._instance = instance
1145

    
1146
    self._snap_disks = []
1147
    self._removed_snaps = [False] * len(instance.disks)
1148

    
1149
  def CreateSnapshots(self):
1150
    """Creates an LVM snapshot for every disk of the instance.
1151

1152
    """
1153
    assert not self._snap_disks
1154

    
1155
    instance = self._instance
1156
    src_node = instance.primary_node
1157

    
1158
    for idx, disk in enumerate(instance.disks):
1159
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1160
                        (idx, src_node))
1161

    
1162
      # result.payload will be a snapshot of an lvm leaf of the one we
1163
      # passed
1164
      result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1165
      new_dev = False
1166
      msg = result.fail_msg
1167
      if msg:
1168
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1169
                            idx, src_node, msg)
1170
      elif (not isinstance(result.payload, (tuple, list)) or
1171
            len(result.payload) != 2):
1172
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1173
                            " result '%s'", idx, src_node, result.payload)
1174
      else:
1175
        disk_id = tuple(result.payload)
1176
        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1177
                               logical_id=disk_id, physical_id=disk_id,
1178
                               iv_name=disk.iv_name)
1179

    
1180
      self._snap_disks.append(new_dev)
1181

    
1182
    assert len(self._snap_disks) == len(instance.disks)
1183
    assert len(self._removed_snaps) == len(instance.disks)
1184

    
1185
  def _RemoveSnapshot(self, disk_index):
1186
    """Removes an LVM snapshot.
1187

1188
    @type disk_index: number
1189
    @param disk_index: Index of the snapshot to be removed
1190

1191
    """
1192
    disk = self._snap_disks[disk_index]
1193
    if disk and not self._removed_snaps[disk_index]:
1194
      src_node = self._instance.primary_node
1195

    
1196
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1197
                        (disk_index, src_node))
1198

    
1199
      result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1200
      if result.fail_msg:
1201
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1202
                            " %s: %s", disk_index, src_node, result.fail_msg)
1203
      else:
1204
        self._removed_snaps[disk_index] = True
1205

    
1206
  def LocalExport(self, dest_node):
1207
    """Intra-cluster instance export.
1208

1209
    @type dest_node: L{objects.Node}
1210
    @param dest_node: Destination node
1211

1212
    """
1213
    instance = self._instance
1214
    src_node = instance.primary_node
1215

    
1216
    assert len(self._snap_disks) == len(instance.disks)
1217

    
1218
    transfers = []
1219

    
1220
    for idx, dev in enumerate(self._snap_disks):
1221
      if not dev:
1222
        transfers.append(None)
1223
        continue
1224

    
1225
      path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1226
                            dev.physical_id[1])
1227

    
1228
      finished_fn = compat.partial(self._TransferFinished, idx)
1229

    
1230
      # FIXME: pass debug option from opcode to backend
1231
      dt = DiskTransfer("snapshot/%s" % idx,
1232
                        constants.IEIO_SCRIPT, (dev, idx),
1233
                        constants.IEIO_FILE, (path, ),
1234
                        finished_fn)
1235
      transfers.append(dt)
1236

    
1237
    # Actually export data
1238
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1239
                                    src_node, dest_node.name,
1240
                                    dest_node.secondary_ip,
1241
                                    instance, transfers)
1242

    
1243
    assert len(dresults) == len(instance.disks)
1244

    
1245
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1246
    result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1247
                                               self._snap_disks)
1248
    msg = result.fail_msg
1249
    fin_resu = not msg
1250
    if msg:
1251
      self._lu.LogWarning("Could not finalize export for instance %s"
1252
                          " on node %s: %s", instance.name, dest_node.name, msg)
1253

    
1254
    return (fin_resu, dresults)
1255

    
1256
  def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1257
    """Inter-cluster instance export.
1258

1259
    @type disk_info: list
1260
    @param disk_info: Per-disk destination information
1261
    @type key_name: string
1262
    @param key_name: Name of X509 key to use
1263
    @type dest_ca_pem: string
1264
    @param dest_ca_pem: Destination X509 CA in PEM format
1265
    @type timeouts: L{ImportExportTimeouts}
1266
    @param timeouts: Timeouts for this import
1267

1268
    """
1269
    instance = self._instance
1270

    
1271
    assert len(disk_info) == len(instance.disks)
1272

    
1273
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1274

    
1275
    ieloop = ImportExportLoop(self._lu)
1276
    try:
1277
      for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1278
                                                           disk_info)):
1279
        # Decide whether to use IPv6
1280
        ipv6 = netutils.IP6Address.IsValid(host)
1281

    
1282
        opts = objects.ImportExportOptions(key_name=key_name,
1283
                                           ca_pem=dest_ca_pem,
1284
                                           magic=magic, ipv6=ipv6)
1285

    
1286
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1287
        finished_fn = compat.partial(self._TransferFinished, idx)
1288
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1289
                              opts, host, port, instance,
1290
                              constants.IEIO_SCRIPT, (dev, idx),
1291
                              timeouts, cbs, private=(idx, finished_fn)))
1292

    
1293
      ieloop.Run()
1294
    finally:
1295
      ieloop.FinalizeAll()
1296

    
1297
    return (True, cbs.disk_results)
1298

    
1299
  def _TransferFinished(self, idx):
1300
    """Called once a transfer has finished.
1301

1302
    @type idx: number
1303
    @param idx: Disk index
1304

1305
    """
1306
    logging.debug("Transfer %s finished", idx)
1307
    self._RemoveSnapshot(idx)
1308

    
1309
  def Cleanup(self):
1310
    """Remove all snapshots.
1311

1312
    """
1313
    assert len(self._removed_snaps) == len(self._instance.disks)
1314
    for idx in range(len(self._instance.disks)):
1315
      self._RemoveSnapshot(idx)
1316

    
1317

    
1318
class _RemoteImportCb(ImportExportCbBase):
1319
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1320
               external_address):
1321
    """Initializes this class.
1322

1323
    @type cds: string
1324
    @param cds: Cluster domain secret
1325
    @type x509_cert_pem: string
1326
    @param x509_cert_pem: CA used for signing import key
1327
    @type disk_count: number
1328
    @param disk_count: Number of disks
1329
    @type external_address: string
1330
    @param external_address: External address of destination node
1331

1332
    """
1333
    ImportExportCbBase.__init__(self)
1334
    self._feedback_fn = feedback_fn
1335
    self._cds = cds
1336
    self._x509_cert_pem = x509_cert_pem
1337
    self._disk_count = disk_count
1338
    self._external_address = external_address
1339

    
1340
    self._dresults = [None] * disk_count
1341
    self._daemon_port = [None] * disk_count
1342

    
1343
    self._salt = utils.GenerateSecret(8)
1344

    
1345
  @property
1346
  def disk_results(self):
1347
    """Returns per-disk results.
1348

1349
    """
1350
    return self._dresults
1351

    
1352
  def _CheckAllListening(self):
1353
    """Checks whether all daemons are listening.
1354

1355
    If all daemons are listening, the information is sent to the client.
1356

1357
    """
1358
    if not compat.all(dp is not None for dp in self._daemon_port):
1359
      return
1360

    
1361
    host = self._external_address
1362

    
1363
    disks = []
1364
    for idx, (port, magic) in enumerate(self._daemon_port):
1365
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1366
                                               idx, host, port, magic))
1367

    
1368
    assert len(disks) == self._disk_count
1369

    
1370
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1371
      "disks": disks,
1372
      "x509_ca": self._x509_cert_pem,
1373
      })
1374

    
1375
  def ReportListening(self, ie, private):
1376
    """Called when daemon started listening.
1377

1378
    """
1379
    (idx, ) = private
1380

    
1381
    self._feedback_fn("Disk %s is now listening" % idx)
1382

    
1383
    assert self._daemon_port[idx] is None
1384

    
1385
    self._daemon_port[idx] = (ie.listen_port, ie.magic)
1386

    
1387
    self._CheckAllListening()
1388

    
1389
  def ReportConnected(self, ie, private):
1390
    """Called when a connection has been established.
1391

1392
    """
1393
    (idx, ) = private
1394

    
1395
    self._feedback_fn("Disk %s is now receiving data" % idx)
1396

    
1397
  def ReportFinished(self, ie, private):
1398
    """Called when a transfer has finished.
1399

1400
    """
1401
    (idx, ) = private
1402

    
1403
    # Daemon is certainly no longer listening
1404
    self._daemon_port[idx] = None
1405

    
1406
    if ie.success:
1407
      self._feedback_fn("Disk %s finished receiving data" % idx)
1408
    else:
1409
      self._feedback_fn(("Disk %s failed to receive data: %s"
1410
                         " (recent output: %s)") %
1411
                        (idx, ie.final_message, ie.recent_output))
1412

    
1413
    self._dresults[idx] = bool(ie.success)
1414

    
1415

    
1416
def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1417
                 cds, timeouts):
1418
  """Imports an instance from another cluster.
1419

1420
  @param lu: Logical unit instance
1421
  @param feedback_fn: Feedback function
1422
  @type instance: L{objects.Instance}
1423
  @param instance: Instance object
1424
  @type pnode: L{objects.Node}
1425
  @param pnode: Primary node of instance as an object
1426
  @type source_x509_ca: OpenSSL.crypto.X509
1427
  @param source_x509_ca: Import source's X509 CA
1428
  @type cds: string
1429
  @param cds: Cluster domain secret
1430
  @type timeouts: L{ImportExportTimeouts}
1431
  @param timeouts: Timeouts for this import
1432

1433
  """
1434
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1435
                                                  source_x509_ca)
1436

    
1437
  magic_base = utils.GenerateSecret(6)
1438

    
1439
  # Decide whether to use IPv6
1440
  ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1441

    
1442
  # Create crypto key
1443
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1444
                                        constants.RIE_CERT_VALIDITY)
1445
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1446

    
1447
  (x509_key_name, x509_cert_pem) = result.payload
1448
  try:
1449
    # Load certificate
1450
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1451
                                                x509_cert_pem)
1452

    
1453
    # Sign certificate
1454
    signed_x509_cert_pem = \
1455
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1456

    
1457
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1458
                          len(instance.disks), pnode.primary_ip)
1459

    
1460
    ieloop = ImportExportLoop(lu)
1461
    try:
1462
      for idx, dev in enumerate(instance.disks):
1463
        magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1464

    
1465
        # Import daemon options
1466
        opts = objects.ImportExportOptions(key_name=x509_key_name,
1467
                                           ca_pem=source_ca_pem,
1468
                                           magic=magic, ipv6=ipv6)
1469

    
1470
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1471
                              constants.IEIO_SCRIPT, (dev, idx),
1472
                              timeouts, cbs, private=(idx, )))
1473

    
1474
      ieloop.Run()
1475
    finally:
1476
      ieloop.FinalizeAll()
1477
  finally:
1478
    # Remove crypto key and certificate
1479
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1480
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1481

    
1482
  return cbs.disk_results
1483

    
1484

    
1485
def _GetImportExportHandshakeMessage(version):
1486
  """Returns the handshake message for a RIE protocol version.
1487

1488
  @type version: number
1489

1490
  """
1491
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1492

    
1493

    
1494
def ComputeRemoteExportHandshake(cds):
1495
  """Computes the remote import/export handshake.
1496

1497
  @type cds: string
1498
  @param cds: Cluster domain secret
1499

1500
  """
1501
  salt = utils.GenerateSecret(8)
1502
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1503
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1504

    
1505

    
1506
def CheckRemoteExportHandshake(cds, handshake):
1507
  """Checks the handshake of a remote import/export.
1508

1509
  @type cds: string
1510
  @param cds: Cluster domain secret
1511
  @type handshake: sequence
1512
  @param handshake: Handshake sent by remote peer
1513

1514
  """
1515
  try:
1516
    (version, hmac_digest, hmac_salt) = handshake
1517
  except (TypeError, ValueError), err:
1518
    return "Invalid data: %s" % err
1519

    
1520
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1521
                              hmac_digest, salt=hmac_salt):
1522
    return "Hash didn't match, clusters don't share the same domain secret"
1523

    
1524
  if version != constants.RIE_VERSION:
1525
    return ("Clusters don't have the same remote import/export protocol"
1526
            " (local=%s, remote=%s)" %
1527
            (constants.RIE_VERSION, version))
1528

    
1529
  return None
1530

    
1531

    
1532
def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1533
  """Returns the hashed text for import/export disk information.
1534

1535
  @type disk_index: number
1536
  @param disk_index: Index of disk (included in hash)
1537
  @type host: string
1538
  @param host: Hostname
1539
  @type port: number
1540
  @param port: Daemon port
1541
  @type magic: string
1542
  @param magic: Magic value
1543

1544
  """
1545
  return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1546

    
1547

    
1548
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1549
  """Verifies received disk information for an export.
1550

1551
  @type cds: string
1552
  @param cds: Cluster domain secret
1553
  @type disk_index: number
1554
  @param disk_index: Index of disk (included in hash)
1555
  @type disk_info: sequence
1556
  @param disk_info: Disk information sent by remote peer
1557

1558
  """
1559
  try:
1560
    (host, port, magic, hmac_digest, hmac_salt) = disk_info
1561
  except (TypeError, ValueError), err:
1562
    raise errors.GenericError("Invalid data: %s" % err)
1563

    
1564
  if not (host and port and magic):
1565
    raise errors.GenericError("Missing destination host, port or magic")
1566

    
1567
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1568

    
1569
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1570
    raise errors.GenericError("HMAC is wrong")
1571

    
1572
  if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1573
    destination = host
1574
  else:
1575
    destination = netutils.Hostname.GetNormalizedName(host)
1576

    
1577
  return (destination,
1578
          utils.ValidateServiceName(port),
1579
          magic)
1580

    
1581

    
1582
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1583
  """Computes the signed disk information for a remote import.
1584

1585
  @type cds: string
1586
  @param cds: Cluster domain secret
1587
  @type salt: string
1588
  @param salt: HMAC salt
1589
  @type disk_index: number
1590
  @param disk_index: Index of disk (included in hash)
1591
  @type host: string
1592
  @param host: Hostname
1593
  @type port: number
1594
  @param port: Daemon port
1595
  @type magic: string
1596
  @param magic: Magic value
1597

1598
  """
1599
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1600
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1601
  return (host, port, magic, hmac_digest, salt)