Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ d728ac75

History | View | Annotate | Download (44.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Instance-related functions and classes for masterd.
23

24
"""
25

    
26
import logging
27
import time
28
import OpenSSL
29

    
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import compat
33
from ganeti import utils
34
from ganeti import objects
35
from ganeti import netutils
36

    
37

    
38
class _ImportExportError(Exception):
39
  """Local exception to report import/export errors.
40

41
  """
42

    
43

    
44
class ImportExportTimeouts(object):
45
  #: Time until daemon starts writing status file
46
  DEFAULT_READY_TIMEOUT = 10
47

    
48
  #: Length of time until errors cause hard failure
49
  DEFAULT_ERROR_TIMEOUT = 10
50

    
51
  #: Time after which daemon must be listening
52
  DEFAULT_LISTEN_TIMEOUT = 10
53

    
54
  #: Progress update interval
55
  DEFAULT_PROGRESS_INTERVAL = 60
56

    
57
  __slots__ = [
58
    "error",
59
    "ready",
60
    "listen",
61
    "connect",
62
    "progress",
63
    ]
64

    
65
  def __init__(self, connect,
66
               listen=DEFAULT_LISTEN_TIMEOUT,
67
               error=DEFAULT_ERROR_TIMEOUT,
68
               ready=DEFAULT_READY_TIMEOUT,
69
               progress=DEFAULT_PROGRESS_INTERVAL):
70
    """Initializes this class.
71

72
    @type connect: number
73
    @param connect: Timeout for establishing connection
74
    @type listen: number
75
    @param listen: Timeout for starting to listen for connections
76
    @type error: number
77
    @param error: Length of time until errors cause hard failure
78
    @type ready: number
79
    @param ready: Timeout for daemon to become ready
80
    @type progress: number
81
    @param progress: Progress update interval
82

83
    """
84
    self.error = error
85
    self.ready = ready
86
    self.listen = listen
87
    self.connect = connect
88
    self.progress = progress
89

    
90

    
91
class ImportExportCbBase(object):
92
  """Callbacks for disk import/export.
93

94
  """
95
  def ReportListening(self, ie, private, component):
96
    """Called when daemon started listening.
97

98
    @type ie: Subclass of L{_DiskImportExportBase}
99
    @param ie: Import/export object
100
    @param private: Private data passed to import/export object
101
    @param component: transfer component name
102

103
    """
104

    
105
  def ReportConnected(self, ie, private):
106
    """Called when a connection has been established.
107

108
    @type ie: Subclass of L{_DiskImportExportBase}
109
    @param ie: Import/export object
110
    @param private: Private data passed to import/export object
111

112
    """
113

    
114
  def ReportProgress(self, ie, private):
115
    """Called when new progress information should be reported.
116

117
    @type ie: Subclass of L{_DiskImportExportBase}
118
    @param ie: Import/export object
119
    @param private: Private data passed to import/export object
120

121
    """
122

    
123
  def ReportFinished(self, ie, private):
124
    """Called when a transfer has finished.
125

126
    @type ie: Subclass of L{_DiskImportExportBase}
127
    @param ie: Import/export object
128
    @param private: Private data passed to import/export object
129

130
    """
131

    
132

    
133
def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
134
  """Checks whether a timeout has expired.
135

136
  """
137
  return _time_fn() > (epoch + timeout)
138

    
139

    
140
class _DiskImportExportBase(object):
141
  MODE_TEXT = None
142

    
143
  def __init__(self, lu, node_name, opts,
144
               instance, component, timeouts, cbs, private=None):
145
    """Initializes this class.
146

147
    @param lu: Logical unit instance
148
    @type node_name: string
149
    @param node_name: Node name for import
150
    @type opts: L{objects.ImportExportOptions}
151
    @param opts: Import/export daemon options
152
    @type instance: L{objects.Instance}
153
    @param instance: Instance object
154
    @type component: string
155
    @param component: which part of the instance is being imported
156
    @type timeouts: L{ImportExportTimeouts}
157
    @param timeouts: Timeouts for this import
158
    @type cbs: L{ImportExportCbBase}
159
    @param cbs: Callbacks
160
    @param private: Private data for callback functions
161

162
    """
163
    assert self.MODE_TEXT
164

    
165
    self._lu = lu
166
    self.node_name = node_name
167
    self._opts = opts.Copy()
168
    self._instance = instance
169
    self._component = component
170
    self._timeouts = timeouts
171
    self._cbs = cbs
172
    self._private = private
173

    
174
    # Set master daemon's timeout in options for import/export daemon
175
    assert self._opts.connect_timeout is None
176
    self._opts.connect_timeout = timeouts.connect
177

    
178
    # Parent loop
179
    self._loop = None
180

    
181
    # Timestamps
182
    self._ts_begin = None
183
    self._ts_connected = None
184
    self._ts_finished = None
185
    self._ts_cleanup = None
186
    self._ts_last_progress = None
187
    self._ts_last_error = None
188

    
189
    # Transfer status
190
    self.success = None
191
    self.final_message = None
192

    
193
    # Daemon status
194
    self._daemon_name = None
195
    self._daemon = None
196

    
197
  @property
198
  def recent_output(self):
199
    """Returns the most recent output from the daemon.
200

201
    """
202
    if self._daemon:
203
      return "\n".join(self._daemon.recent_output)
204

    
205
    return None
206

    
207
  @property
208
  def progress(self):
209
    """Returns transfer progress information.
210

211
    """
212
    if not self._daemon:
213
      return None
214

    
215
    return (self._daemon.progress_mbytes,
216
            self._daemon.progress_throughput,
217
            self._daemon.progress_percent,
218
            self._daemon.progress_eta)
219

    
220
  @property
221
  def magic(self):
222
    """Returns the magic value for this import/export.
223

224
    """
225
    return self._opts.magic
226

    
227
  @property
228
  def active(self):
229
    """Determines whether this transport is still active.
230

231
    """
232
    return self.success is None
233

    
234
  @property
235
  def loop(self):
236
    """Returns parent loop.
237

238
    @rtype: L{ImportExportLoop}
239

240
    """
241
    return self._loop
242

    
243
  def SetLoop(self, loop):
244
    """Sets the parent loop.
245

246
    @type loop: L{ImportExportLoop}
247

248
    """
249
    if self._loop:
250
      raise errors.ProgrammerError("Loop can only be set once")
251

    
252
    self._loop = loop
253

    
254
  def _StartDaemon(self):
255
    """Starts the import/export daemon.
256

257
    """
258
    raise NotImplementedError()
259

    
260
  def CheckDaemon(self):
261
    """Checks whether daemon has been started and if not, starts it.
262

263
    @rtype: string
264
    @return: Daemon name
265

266
    """
267
    assert self._ts_cleanup is None
268

    
269
    if self._daemon_name is None:
270
      assert self._ts_begin is None
271

    
272
      result = self._StartDaemon()
273
      if result.fail_msg:
274
        raise _ImportExportError("Failed to start %s on %s: %s" %
275
                                 (self.MODE_TEXT, self.node_name,
276
                                  result.fail_msg))
277

    
278
      daemon_name = result.payload
279

    
280
      logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
281
                   self.node_name)
282

    
283
      self._ts_begin = time.time()
284
      self._daemon_name = daemon_name
285

    
286
    return self._daemon_name
287

    
288
  def GetDaemonName(self):
289
    """Returns the daemon name.
290

291
    """
292
    assert self._daemon_name, "Daemon has not been started"
293
    assert self._ts_cleanup is None
294
    return self._daemon_name
295

    
296
  def Abort(self):
297
    """Sends SIGTERM to import/export daemon (if still active).
298

299
    """
300
    if self._daemon_name:
301
      self._lu.LogWarning("Aborting %s '%s' on %s",
302
                          self.MODE_TEXT, self._daemon_name, self.node_name)
303
      result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
304
      if result.fail_msg:
305
        self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
306
                            self.MODE_TEXT, self._daemon_name,
307
                            self.node_name, result.fail_msg)
308
        return False
309

    
310
    return True
311

    
312
  def _SetDaemonData(self, data):
313
    """Internal function for updating status daemon data.
314

315
    @type data: L{objects.ImportExportStatus}
316
    @param data: Daemon status data
317

318
    """
319
    assert self._ts_begin is not None
320

    
321
    if not data:
322
      if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
323
        raise _ImportExportError("Didn't become ready after %s seconds" %
324
                                 self._timeouts.ready)
325

    
326
      return False
327

    
328
    self._daemon = data
329

    
330
    return True
331

    
332
  def SetDaemonData(self, success, data):
333
    """Updates daemon status data.
334

335
    @type success: bool
336
    @param success: Whether fetching data was successful or not
337
    @type data: L{objects.ImportExportStatus}
338
    @param data: Daemon status data
339

340
    """
341
    if not success:
342
      if self._ts_last_error is None:
343
        self._ts_last_error = time.time()
344

    
345
      elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
346
        raise _ImportExportError("Too many errors while updating data")
347

    
348
      return False
349

    
350
    self._ts_last_error = None
351

    
352
    return self._SetDaemonData(data)
353

    
354
  def CheckListening(self):
355
    """Checks whether the daemon is listening.
356

357
    """
358
    raise NotImplementedError()
359

    
360
  def _GetConnectedCheckEpoch(self):
361
    """Returns timeout to calculate connect timeout.
362

363
    """
364
    raise NotImplementedError()
365

    
366
  def CheckConnected(self):
367
    """Checks whether the daemon is connected.
368

369
    @rtype: bool
370
    @return: Whether the daemon is connected
371

372
    """
373
    assert self._daemon, "Daemon status missing"
374

    
375
    if self._ts_connected is not None:
376
      return True
377

    
378
    if self._daemon.connected:
379
      self._ts_connected = time.time()
380

    
381
      # TODO: Log remote peer
382
      logging.debug("%s '%s' on %s is now connected",
383
                    self.MODE_TEXT, self._daemon_name, self.node_name)
384

    
385
      self._cbs.ReportConnected(self, self._private)
386

    
387
      return True
388

    
389
    if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
390
      raise _ImportExportError("Not connected after %s seconds" %
391
                               self._timeouts.connect)
392

    
393
    return False
394

    
395
  def _CheckProgress(self):
396
    """Checks whether a progress update should be reported.
397

398
    """
399
    if ((self._ts_last_progress is None or
400
         _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
401
        self._daemon and
402
        self._daemon.progress_mbytes is not None and
403
        self._daemon.progress_throughput is not None):
404
      self._cbs.ReportProgress(self, self._private)
405
      self._ts_last_progress = time.time()
406

    
407
  def CheckFinished(self):
408
    """Checks whether the daemon exited.
409

410
    @rtype: bool
411
    @return: Whether the transfer is finished
412

413
    """
414
    assert self._daemon, "Daemon status missing"
415

    
416
    if self._ts_finished:
417
      return True
418

    
419
    if self._daemon.exit_status is None:
420
      # TODO: Adjust delay for ETA expiring soon
421
      self._CheckProgress()
422
      return False
423

    
424
    self._ts_finished = time.time()
425

    
426
    self._ReportFinished(self._daemon.exit_status == 0,
427
                         self._daemon.error_message)
428

    
429
    return True
430

    
431
  def _ReportFinished(self, success, message):
432
    """Transfer is finished or daemon exited.
433

434
    @type success: bool
435
    @param success: Whether the transfer was successful
436
    @type message: string
437
    @param message: Error message
438

439
    """
440
    assert self.success is None
441

    
442
    self.success = success
443
    self.final_message = message
444

    
445
    if success:
446
      logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
447
                   self._daemon_name, self.node_name)
448
    elif self._daemon_name:
449
      self._lu.LogWarning("%s '%s' on %s failed: %s",
450
                          self.MODE_TEXT, self._daemon_name, self.node_name,
451
                          message)
452
    else:
453
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
454
                          self.node_name, message)
455

    
456
    self._cbs.ReportFinished(self, self._private)
457

    
458
  def _Finalize(self):
459
    """Makes the RPC call to finalize this import/export.
460

461
    """
462
    return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
463

    
464
  def Finalize(self, error=None):
465
    """Finalizes this import/export.
466

467
    """
468
    if self._daemon_name:
469
      logging.info("Finalizing %s '%s' on %s",
470
                   self.MODE_TEXT, self._daemon_name, self.node_name)
471

    
472
      result = self._Finalize()
473
      if result.fail_msg:
474
        self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
475
                            self.MODE_TEXT, self._daemon_name,
476
                            self.node_name, result.fail_msg)
477
        return False
478

    
479
      # Daemon is no longer running
480
      self._daemon_name = None
481
      self._ts_cleanup = time.time()
482

    
483
    if error:
484
      self._ReportFinished(False, error)
485

    
486
    return True
487

    
488

    
489
class DiskImport(_DiskImportExportBase):
490
  MODE_TEXT = "import"
491

    
492
  def __init__(self, lu, node_name, opts, instance, component,
493
               dest, dest_args, timeouts, cbs, private=None):
494
    """Initializes this class.
495

496
    @param lu: Logical unit instance
497
    @type node_name: string
498
    @param node_name: Node name for import
499
    @type opts: L{objects.ImportExportOptions}
500
    @param opts: Import/export daemon options
501
    @type instance: L{objects.Instance}
502
    @param instance: Instance object
503
    @type component: string
504
    @param component: which part of the instance is being imported
505
    @param dest: I/O destination
506
    @param dest_args: I/O arguments
507
    @type timeouts: L{ImportExportTimeouts}
508
    @param timeouts: Timeouts for this import
509
    @type cbs: L{ImportExportCbBase}
510
    @param cbs: Callbacks
511
    @param private: Private data for callback functions
512

513
    """
514
    _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
515
                                   component, timeouts, cbs, private)
516
    self._dest = dest
517
    self._dest_args = dest_args
518

    
519
    # Timestamps
520
    self._ts_listening = None
521

    
522
  @property
523
  def listen_port(self):
524
    """Returns the port the daemon is listening on.
525

526
    """
527
    if self._daemon:
528
      return self._daemon.listen_port
529

    
530
    return None
531

    
532
  def _StartDaemon(self):
533
    """Starts the import daemon.
534

535
    """
536
    return self._lu.rpc.call_import_start(self.node_name, self._opts,
537
                                          self._instance, self._component,
538
                                          self._dest, self._dest_args)
539

    
540
  def CheckListening(self):
541
    """Checks whether the daemon is listening.
542

543
    @rtype: bool
544
    @return: Whether the daemon is listening
545

546
    """
547
    assert self._daemon, "Daemon status missing"
548

    
549
    if self._ts_listening is not None:
550
      return True
551

    
552
    port = self._daemon.listen_port
553
    if port is not None:
554
      self._ts_listening = time.time()
555

    
556
      logging.debug("Import '%s' on %s is now listening on port %s",
557
                    self._daemon_name, self.node_name, port)
558

    
559
      self._cbs.ReportListening(self, self._private, self._component)
560

    
561
      return True
562

    
563
    if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
564
      raise _ImportExportError("Not listening after %s seconds" %
565
                               self._timeouts.listen)
566

    
567
    return False
568

    
569
  def _GetConnectedCheckEpoch(self):
570
    """Returns the time since we started listening.
571

572
    """
573
    assert self._ts_listening is not None, \
574
           ("Checking whether an import is connected is only useful"
575
            " once it's been listening")
576

    
577
    return self._ts_listening
578

    
579

    
580
class DiskExport(_DiskImportExportBase):
581
  MODE_TEXT = "export"
582

    
583
  def __init__(self, lu, node_name, opts, dest_host, dest_port,
584
               instance, component, source, source_args,
585
               timeouts, cbs, private=None):
586
    """Initializes this class.
587

588
    @param lu: Logical unit instance
589
    @type node_name: string
590
    @param node_name: Node name for import
591
    @type opts: L{objects.ImportExportOptions}
592
    @param opts: Import/export daemon options
593
    @type dest_host: string
594
    @param dest_host: Destination host name or IP address
595
    @type dest_port: number
596
    @param dest_port: Destination port number
597
    @type instance: L{objects.Instance}
598
    @param instance: Instance object
599
    @type component: string
600
    @param component: which part of the instance is being imported
601
    @param source: I/O source
602
    @param source_args: I/O source
603
    @type timeouts: L{ImportExportTimeouts}
604
    @param timeouts: Timeouts for this import
605
    @type cbs: L{ImportExportCbBase}
606
    @param cbs: Callbacks
607
    @param private: Private data for callback functions
608

609
    """
610
    _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
611
                                   component, timeouts, cbs, private)
612
    self._dest_host = dest_host
613
    self._dest_port = dest_port
614
    self._source = source
615
    self._source_args = source_args
616

    
617
  def _StartDaemon(self):
618
    """Starts the export daemon.
619

620
    """
621
    return self._lu.rpc.call_export_start(self.node_name, self._opts,
622
                                          self._dest_host, self._dest_port,
623
                                          self._instance, self._component,
624
                                          self._source, self._source_args)
625

    
626
  def CheckListening(self):
627
    """Checks whether the daemon is listening.
628

629
    """
630
    # Only an import can be listening
631
    return True
632

    
633
  def _GetConnectedCheckEpoch(self):
634
    """Returns the time since the daemon started.
635

636
    """
637
    assert self._ts_begin is not None
638

    
639
    return self._ts_begin
640

    
641

    
642
def FormatProgress(progress):
643
  """Formats progress information for user consumption
644

645
  """
646
  (mbytes, throughput, percent, eta) = progress
647

    
648
  parts = [
649
    utils.FormatUnit(mbytes, "h"),
650

    
651
    # Not using FormatUnit as it doesn't support kilobytes
652
    "%0.1f MiB/s" % throughput,
653
    ]
654

    
655
  if percent is not None:
656
    parts.append("%d%%" % percent)
657

    
658
  if eta is not None:
659
    parts.append("ETA %s" % utils.FormatSeconds(eta))
660

    
661
  return utils.CommaJoin(parts)
662

    
663

    
664
class ImportExportLoop:
665
  MIN_DELAY = 1.0
666
  MAX_DELAY = 20.0
667

    
668
  def __init__(self, lu):
669
    """Initializes this class.
670

671
    """
672
    self._lu = lu
673
    self._queue = []
674
    self._pending_add = []
675

    
676
  def Add(self, diskie):
677
    """Adds an import/export object to the loop.
678

679
    @type diskie: Subclass of L{_DiskImportExportBase}
680
    @param diskie: Import/export object
681

682
    """
683
    assert diskie not in self._pending_add
684
    assert diskie.loop is None
685

    
686
    diskie.SetLoop(self)
687

    
688
    # Adding new objects to a staging list is necessary, otherwise the main
689
    # loop gets confused if callbacks modify the queue while the main loop is
690
    # iterating over it.
691
    self._pending_add.append(diskie)
692

    
693
  @staticmethod
694
  def _CollectDaemonStatus(lu, daemons):
695
    """Collects the status for all import/export daemons.
696

697
    """
698
    daemon_status = {}
699

    
700
    for node_name, names in daemons.iteritems():
701
      result = lu.rpc.call_impexp_status(node_name, names)
702
      if result.fail_msg:
703
        lu.LogWarning("Failed to get daemon status on %s: %s",
704
                      node_name, result.fail_msg)
705
        continue
706

    
707
      assert len(names) == len(result.payload)
708

    
709
      daemon_status[node_name] = dict(zip(names, result.payload))
710

    
711
    return daemon_status
712

    
713
  @staticmethod
714
  def _GetActiveDaemonNames(queue):
715
    """Gets the names of all active daemons.
716

717
    """
718
    result = {}
719
    for diskie in queue:
720
      if not diskie.active:
721
        continue
722

    
723
      try:
724
        # Start daemon if necessary
725
        daemon_name = diskie.CheckDaemon()
726
      except _ImportExportError, err:
727
        logging.exception("%s failed", diskie.MODE_TEXT)
728
        diskie.Finalize(error=str(err))
729
        continue
730

    
731
      result.setdefault(diskie.node_name, []).append(daemon_name)
732

    
733
    assert len(queue) >= len(result)
734
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
735

    
736
    logging.debug("daemons=%r", result)
737

    
738
    return result
739

    
740
  def _AddPendingToQueue(self):
741
    """Adds all pending import/export objects to the internal queue.
742

743
    """
744
    assert compat.all(diskie not in self._queue and diskie.loop == self
745
                      for diskie in self._pending_add)
746

    
747
    self._queue.extend(self._pending_add)
748

    
749
    del self._pending_add[:]
750

    
751
  def Run(self):
752
    """Utility main loop.
753

754
    """
755
    while True:
756
      self._AddPendingToQueue()
757

    
758
      # Collect all active daemon names
759
      daemons = self._GetActiveDaemonNames(self._queue)
760
      if not daemons:
761
        break
762

    
763
      # Collection daemon status data
764
      data = self._CollectDaemonStatus(self._lu, daemons)
765

    
766
      # Use data
767
      delay = self.MAX_DELAY
768
      for diskie in self._queue:
769
        if not diskie.active:
770
          continue
771

    
772
        try:
773
          try:
774
            all_daemon_data = data[diskie.node_name]
775
          except KeyError:
776
            result = diskie.SetDaemonData(False, None)
777
          else:
778
            result = \
779
              diskie.SetDaemonData(True,
780
                                   all_daemon_data[diskie.GetDaemonName()])
781

    
782
          if not result:
783
            # Daemon not yet ready, retry soon
784
            delay = min(3.0, delay)
785
            continue
786

    
787
          if diskie.CheckFinished():
788
            # Transfer finished
789
            diskie.Finalize()
790
            continue
791

    
792
          # Normal case: check again in 5 seconds
793
          delay = min(5.0, delay)
794

    
795
          if not diskie.CheckListening():
796
            # Not yet listening, retry soon
797
            delay = min(1.0, delay)
798
            continue
799

    
800
          if not diskie.CheckConnected():
801
            # Not yet connected, retry soon
802
            delay = min(1.0, delay)
803
            continue
804

    
805
        except _ImportExportError, err:
806
          logging.exception("%s failed", diskie.MODE_TEXT)
807
          diskie.Finalize(error=str(err))
808

    
809
      if not compat.any(diskie.active for diskie in self._queue):
810
        break
811

    
812
      # Wait a bit
813
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
814
      logging.debug("Waiting for %ss", delay)
815
      time.sleep(delay)
816

    
817
  def FinalizeAll(self):
818
    """Finalizes all pending transfers.
819

820
    """
821
    success = True
822

    
823
    for diskie in self._queue:
824
      success = diskie.Finalize() and success
825

    
826
    return success
827

    
828

    
829
class _TransferInstCbBase(ImportExportCbBase):
830
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
831
               dest_node, dest_ip):
832
    """Initializes this class.
833

834
    """
835
    ImportExportCbBase.__init__(self)
836

    
837
    self.lu = lu
838
    self.feedback_fn = feedback_fn
839
    self.instance = instance
840
    self.timeouts = timeouts
841
    self.src_node = src_node
842
    self.src_cbs = src_cbs
843
    self.dest_node = dest_node
844
    self.dest_ip = dest_ip
845

    
846

    
847
class _TransferInstSourceCb(_TransferInstCbBase):
848
  def ReportConnected(self, ie, dtp):
849
    """Called when a connection has been established.
850

851
    """
852
    assert self.src_cbs is None
853
    assert dtp.src_export == ie
854
    assert dtp.dest_import
855

    
856
    self.feedback_fn("%s is sending data on %s" %
857
                     (dtp.data.name, ie.node_name))
858

    
859
  def ReportProgress(self, ie, dtp):
860
    """Called when new progress information should be reported.
861

862
    """
863
    progress = ie.progress
864
    if not progress:
865
      return
866

    
867
    self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
868

    
869
  def ReportFinished(self, ie, dtp):
870
    """Called when a transfer has finished.
871

872
    """
873
    assert self.src_cbs is None
874
    assert dtp.src_export == ie
875
    assert dtp.dest_import
876

    
877
    if ie.success:
878
      self.feedback_fn("%s finished sending data" % dtp.data.name)
879
    else:
880
      self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
881
                       (dtp.data.name, ie.final_message, ie.recent_output))
882

    
883
    dtp.RecordResult(ie.success)
884

    
885
    cb = dtp.data.finished_fn
886
    if cb:
887
      cb()
888

    
889
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
890
    # give the daemon a moment to sort things out
891
    if dtp.dest_import and not ie.success:
892
      dtp.dest_import.Abort()
893

    
894

    
895
class _TransferInstDestCb(_TransferInstCbBase):
896
  def ReportListening(self, ie, dtp, component):
897
    """Called when daemon started listening.
898

899
    """
900
    assert self.src_cbs
901
    assert dtp.src_export is None
902
    assert dtp.dest_import
903
    assert dtp.export_opts
904

    
905
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
906

    
907
    # Start export on source node
908
    de = DiskExport(self.lu, self.src_node, dtp.export_opts,
909
                    self.dest_ip, ie.listen_port, self.instance,
910
                    component, dtp.data.src_io, dtp.data.src_ioargs,
911
                    self.timeouts, self.src_cbs, private=dtp)
912
    ie.loop.Add(de)
913

    
914
    dtp.src_export = de
915

    
916
  def ReportConnected(self, ie, dtp):
917
    """Called when a connection has been established.
918

919
    """
920
    self.feedback_fn("%s is receiving data on %s" %
921
                     (dtp.data.name, self.dest_node))
922

    
923
  def ReportFinished(self, ie, dtp):
924
    """Called when a transfer has finished.
925

926
    """
927
    if ie.success:
928
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
929
    else:
930
      self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
931
                       (dtp.data.name, ie.final_message, ie.recent_output))
932

    
933
    dtp.RecordResult(ie.success)
934

    
935
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
936
    # give the daemon a moment to sort things out
937
    if dtp.src_export and not ie.success:
938
      dtp.src_export.Abort()
939

    
940

    
941
class DiskTransfer(object):
942
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
943
               finished_fn):
944
    """Initializes this class.
945

946
    @type name: string
947
    @param name: User-visible name for this transfer (e.g. "disk/0")
948
    @param src_io: Source I/O type
949
    @param src_ioargs: Source I/O arguments
950
    @param dest_io: Destination I/O type
951
    @param dest_ioargs: Destination I/O arguments
952
    @type finished_fn: callable
953
    @param finished_fn: Function called once transfer has finished
954

955
    """
956
    self.name = name
957

    
958
    self.src_io = src_io
959
    self.src_ioargs = src_ioargs
960

    
961
    self.dest_io = dest_io
962
    self.dest_ioargs = dest_ioargs
963

    
964
    self.finished_fn = finished_fn
965

    
966

    
967
class _DiskTransferPrivate(object):
968
  def __init__(self, data, success, export_opts):
969
    """Initializes this class.
970

971
    @type data: L{DiskTransfer}
972
    @type success: bool
973

974
    """
975
    self.data = data
976
    self.success = success
977
    self.export_opts = export_opts
978

    
979
    self.src_export = None
980
    self.dest_import = None
981

    
982
  def RecordResult(self, success):
983
    """Updates the status.
984

985
    One failed part will cause the whole transfer to fail.
986

987
    """
988
    self.success = self.success and success
989

    
990

    
991
def _GetInstDiskMagic(base, instance_name, index):
992
  """Computes the magic value for a disk export or import.
993

994
  @type base: string
995
  @param base: Random seed value (can be the same for all disks of a transfer)
996
  @type instance_name: string
997
  @param instance_name: Name of instance
998
  @type index: number
999
  @param index: Disk index
1000

1001
  """
1002
  h = compat.sha1_hash()
1003
  h.update(str(constants.RIE_VERSION))
1004
  h.update(base)
1005
  h.update(instance_name)
1006
  h.update(str(index))
1007
  return h.hexdigest()
1008

    
1009

    
1010
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
1011
                         instance, all_transfers):
1012
  """Transfers an instance's data from one node to another.
1013

1014
  @param lu: Logical unit instance
1015
  @param feedback_fn: Feedback function
1016
  @type src_node: string
1017
  @param src_node: Source node name
1018
  @type dest_node: string
1019
  @param dest_node: Destination node name
1020
  @type dest_ip: string
1021
  @param dest_ip: IP address of destination node
1022
  @type instance: L{objects.Instance}
1023
  @param instance: Instance object
1024
  @type all_transfers: list of L{DiskTransfer} instances
1025
  @param all_transfers: List of all disk transfers to be made
1026
  @rtype: list
1027
  @return: List with a boolean (True=successful, False=failed) for success for
1028
           each transfer
1029

1030
  """
1031
  # Disable compression for all moves as these are all within the same cluster
1032
  compress = constants.IEC_NONE
1033

    
1034
  logging.debug("Source node %s, destination node %s, compression '%s'",
1035
                src_node, dest_node, compress)
1036

    
1037
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1038
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1039
                                  src_node, None, dest_node, dest_ip)
1040
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1041
                                 src_node, src_cbs, dest_node, dest_ip)
1042

    
1043
  all_dtp = []
1044

    
1045
  base_magic = utils.GenerateSecret(6)
1046

    
1047
  ieloop = ImportExportLoop(lu)
1048
  try:
1049
    for idx, transfer in enumerate(all_transfers):
1050
      if transfer:
1051
        feedback_fn("Exporting %s from %s to %s" %
1052
                    (transfer.name, src_node, dest_node))
1053

    
1054
        magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1055
        opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1056
                                           compress=compress, magic=magic)
1057

    
1058
        dtp = _DiskTransferPrivate(transfer, True, opts)
1059

    
1060
        di = DiskImport(lu, dest_node, opts, instance, "disk%d" % idx,
1061
                        transfer.dest_io, transfer.dest_ioargs,
1062
                        timeouts, dest_cbs, private=dtp)
1063
        ieloop.Add(di)
1064

    
1065
        dtp.dest_import = di
1066
      else:
1067
        dtp = _DiskTransferPrivate(None, False, None)
1068

    
1069
      all_dtp.append(dtp)
1070

    
1071
    ieloop.Run()
1072
  finally:
1073
    ieloop.FinalizeAll()
1074

    
1075
  assert len(all_dtp) == len(all_transfers)
1076
  assert compat.all((dtp.src_export is None or
1077
                      dtp.src_export.success is not None) and
1078
                     (dtp.dest_import is None or
1079
                      dtp.dest_import.success is not None)
1080
                     for dtp in all_dtp), \
1081
         "Not all imports/exports are finalized"
1082

    
1083
  return [bool(dtp.success) for dtp in all_dtp]
1084

    
1085

    
1086
class _RemoteExportCb(ImportExportCbBase):
1087
  def __init__(self, feedback_fn, disk_count):
1088
    """Initializes this class.
1089

1090
    """
1091
    ImportExportCbBase.__init__(self)
1092
    self._feedback_fn = feedback_fn
1093
    self._dresults = [None] * disk_count
1094

    
1095
  @property
1096
  def disk_results(self):
1097
    """Returns per-disk results.
1098

1099
    """
1100
    return self._dresults
1101

    
1102
  def ReportConnected(self, ie, private):
1103
    """Called when a connection has been established.
1104

1105
    """
1106
    (idx, _) = private
1107

    
1108
    self._feedback_fn("Disk %s is now sending data" % idx)
1109

    
1110
  def ReportProgress(self, ie, private):
1111
    """Called when new progress information should be reported.
1112

1113
    """
1114
    (idx, _) = private
1115

    
1116
    progress = ie.progress
1117
    if not progress:
1118
      return
1119

    
1120
    self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1121

    
1122
  def ReportFinished(self, ie, private):
1123
    """Called when a transfer has finished.
1124

1125
    """
1126
    (idx, finished_fn) = private
1127

    
1128
    if ie.success:
1129
      self._feedback_fn("Disk %s finished sending data" % idx)
1130
    else:
1131
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1132
                        (idx, ie.final_message, ie.recent_output))
1133

    
1134
    self._dresults[idx] = bool(ie.success)
1135

    
1136
    if finished_fn:
1137
      finished_fn()
1138

    
1139

    
1140
class ExportInstanceHelper:
1141
  def __init__(self, lu, feedback_fn, instance):
1142
    """Initializes this class.
1143

1144
    @param lu: Logical unit instance
1145
    @param feedback_fn: Feedback function
1146
    @type instance: L{objects.Instance}
1147
    @param instance: Instance object
1148

1149
    """
1150
    self._lu = lu
1151
    self._feedback_fn = feedback_fn
1152
    self._instance = instance
1153

    
1154
    self._snap_disks = []
1155
    self._removed_snaps = [False] * len(instance.disks)
1156

    
1157
  def CreateSnapshots(self):
1158
    """Creates an LVM snapshot for every disk of the instance.
1159

1160
    """
1161
    assert not self._snap_disks
1162

    
1163
    instance = self._instance
1164
    src_node = instance.primary_node
1165

    
1166
    for idx, disk in enumerate(instance.disks):
1167
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1168
                        (idx, src_node))
1169

    
1170
      # result.payload will be a snapshot of an lvm leaf of the one we
1171
      # passed
1172
      result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1173
      new_dev = False
1174
      msg = result.fail_msg
1175
      if msg:
1176
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1177
                            idx, src_node, msg)
1178
      elif (not isinstance(result.payload, (tuple, list)) or
1179
            len(result.payload) != 2):
1180
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1181
                            " result '%s'", idx, src_node, result.payload)
1182
      else:
1183
        disk_id = tuple(result.payload)
1184
        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1185
                               logical_id=disk_id, physical_id=disk_id,
1186
                               iv_name=disk.iv_name)
1187

    
1188
      self._snap_disks.append(new_dev)
1189

    
1190
    assert len(self._snap_disks) == len(instance.disks)
1191
    assert len(self._removed_snaps) == len(instance.disks)
1192

    
1193
  def _RemoveSnapshot(self, disk_index):
1194
    """Removes an LVM snapshot.
1195

1196
    @type disk_index: number
1197
    @param disk_index: Index of the snapshot to be removed
1198

1199
    """
1200
    disk = self._snap_disks[disk_index]
1201
    if disk and not self._removed_snaps[disk_index]:
1202
      src_node = self._instance.primary_node
1203

    
1204
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1205
                        (disk_index, src_node))
1206

    
1207
      result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1208
      if result.fail_msg:
1209
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1210
                            " %s: %s", disk_index, src_node, result.fail_msg)
1211
      else:
1212
        self._removed_snaps[disk_index] = True
1213

    
1214
  def LocalExport(self, dest_node):
1215
    """Intra-cluster instance export.
1216

1217
    @type dest_node: L{objects.Node}
1218
    @param dest_node: Destination node
1219

1220
    """
1221
    instance = self._instance
1222
    src_node = instance.primary_node
1223

    
1224
    assert len(self._snap_disks) == len(instance.disks)
1225

    
1226
    transfers = []
1227

    
1228
    for idx, dev in enumerate(self._snap_disks):
1229
      if not dev:
1230
        transfers.append(None)
1231
        continue
1232

    
1233
      path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1234
                            dev.physical_id[1])
1235

    
1236
      finished_fn = compat.partial(self._TransferFinished, idx)
1237

    
1238
      # FIXME: pass debug option from opcode to backend
1239
      dt = DiskTransfer("snapshot/%s" % idx,
1240
                        constants.IEIO_SCRIPT, (dev, idx),
1241
                        constants.IEIO_FILE, (path, ),
1242
                        finished_fn)
1243
      transfers.append(dt)
1244

    
1245
    # Actually export data
1246
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1247
                                    src_node, dest_node.name,
1248
                                    dest_node.secondary_ip,
1249
                                    instance, transfers)
1250

    
1251
    assert len(dresults) == len(instance.disks)
1252

    
1253
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1254
    result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1255
                                               self._snap_disks)
1256
    msg = result.fail_msg
1257
    fin_resu = not msg
1258
    if msg:
1259
      self._lu.LogWarning("Could not finalize export for instance %s"
1260
                          " on node %s: %s", instance.name, dest_node.name, msg)
1261

    
1262
    return (fin_resu, dresults)
1263

    
1264
  def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1265
    """Inter-cluster instance export.
1266

1267
    @type disk_info: list
1268
    @param disk_info: Per-disk destination information
1269
    @type key_name: string
1270
    @param key_name: Name of X509 key to use
1271
    @type dest_ca_pem: string
1272
    @param dest_ca_pem: Destination X509 CA in PEM format
1273
    @type timeouts: L{ImportExportTimeouts}
1274
    @param timeouts: Timeouts for this import
1275

1276
    """
1277
    instance = self._instance
1278

    
1279
    assert len(disk_info) == len(instance.disks)
1280

    
1281
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1282

    
1283
    ieloop = ImportExportLoop(self._lu)
1284
    try:
1285
      for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1286
                                                           disk_info)):
1287
        # Decide whether to use IPv6
1288
        ipv6 = netutils.IP6Address.IsValid(host)
1289

    
1290
        opts = objects.ImportExportOptions(key_name=key_name,
1291
                                           ca_pem=dest_ca_pem,
1292
                                           magic=magic, ipv6=ipv6)
1293

    
1294
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1295
        finished_fn = compat.partial(self._TransferFinished, idx)
1296
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1297
                              opts, host, port, instance, "disk%d" % idx,
1298
                              constants.IEIO_SCRIPT, (dev, idx),
1299
                              timeouts, cbs, private=(idx, finished_fn)))
1300

    
1301
      ieloop.Run()
1302
    finally:
1303
      ieloop.FinalizeAll()
1304

    
1305
    return (True, cbs.disk_results)
1306

    
1307
  def _TransferFinished(self, idx):
1308
    """Called once a transfer has finished.
1309

1310
    @type idx: number
1311
    @param idx: Disk index
1312

1313
    """
1314
    logging.debug("Transfer %s finished", idx)
1315
    self._RemoveSnapshot(idx)
1316

    
1317
  def Cleanup(self):
1318
    """Remove all snapshots.
1319

1320
    """
1321
    assert len(self._removed_snaps) == len(self._instance.disks)
1322
    for idx in range(len(self._instance.disks)):
1323
      self._RemoveSnapshot(idx)
1324

    
1325

    
1326
class _RemoteImportCb(ImportExportCbBase):
1327
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1328
               external_address):
1329
    """Initializes this class.
1330

1331
    @type cds: string
1332
    @param cds: Cluster domain secret
1333
    @type x509_cert_pem: string
1334
    @param x509_cert_pem: CA used for signing import key
1335
    @type disk_count: number
1336
    @param disk_count: Number of disks
1337
    @type external_address: string
1338
    @param external_address: External address of destination node
1339

1340
    """
1341
    ImportExportCbBase.__init__(self)
1342
    self._feedback_fn = feedback_fn
1343
    self._cds = cds
1344
    self._x509_cert_pem = x509_cert_pem
1345
    self._disk_count = disk_count
1346
    self._external_address = external_address
1347

    
1348
    self._dresults = [None] * disk_count
1349
    self._daemon_port = [None] * disk_count
1350

    
1351
    self._salt = utils.GenerateSecret(8)
1352

    
1353
  @property
1354
  def disk_results(self):
1355
    """Returns per-disk results.
1356

1357
    """
1358
    return self._dresults
1359

    
1360
  def _CheckAllListening(self):
1361
    """Checks whether all daemons are listening.
1362

1363
    If all daemons are listening, the information is sent to the client.
1364

1365
    """
1366
    if not compat.all(dp is not None for dp in self._daemon_port):
1367
      return
1368

    
1369
    host = self._external_address
1370

    
1371
    disks = []
1372
    for idx, (port, magic) in enumerate(self._daemon_port):
1373
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1374
                                               idx, host, port, magic))
1375

    
1376
    assert len(disks) == self._disk_count
1377

    
1378
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1379
      "disks": disks,
1380
      "x509_ca": self._x509_cert_pem,
1381
      })
1382

    
1383
  def ReportListening(self, ie, private, _):
1384
    """Called when daemon started listening.
1385

1386
    """
1387
    (idx, ) = private
1388

    
1389
    self._feedback_fn("Disk %s is now listening" % idx)
1390

    
1391
    assert self._daemon_port[idx] is None
1392

    
1393
    self._daemon_port[idx] = (ie.listen_port, ie.magic)
1394

    
1395
    self._CheckAllListening()
1396

    
1397
  def ReportConnected(self, ie, private):
1398
    """Called when a connection has been established.
1399

1400
    """
1401
    (idx, ) = private
1402

    
1403
    self._feedback_fn("Disk %s is now receiving data" % idx)
1404

    
1405
  def ReportFinished(self, ie, private):
1406
    """Called when a transfer has finished.
1407

1408
    """
1409
    (idx, ) = private
1410

    
1411
    # Daemon is certainly no longer listening
1412
    self._daemon_port[idx] = None
1413

    
1414
    if ie.success:
1415
      self._feedback_fn("Disk %s finished receiving data" % idx)
1416
    else:
1417
      self._feedback_fn(("Disk %s failed to receive data: %s"
1418
                         " (recent output: %s)") %
1419
                        (idx, ie.final_message, ie.recent_output))
1420

    
1421
    self._dresults[idx] = bool(ie.success)
1422

    
1423

    
1424
def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1425
                 cds, timeouts):
1426
  """Imports an instance from another cluster.
1427

1428
  @param lu: Logical unit instance
1429
  @param feedback_fn: Feedback function
1430
  @type instance: L{objects.Instance}
1431
  @param instance: Instance object
1432
  @type pnode: L{objects.Node}
1433
  @param pnode: Primary node of instance as an object
1434
  @type source_x509_ca: OpenSSL.crypto.X509
1435
  @param source_x509_ca: Import source's X509 CA
1436
  @type cds: string
1437
  @param cds: Cluster domain secret
1438
  @type timeouts: L{ImportExportTimeouts}
1439
  @param timeouts: Timeouts for this import
1440

1441
  """
1442
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1443
                                                  source_x509_ca)
1444

    
1445
  magic_base = utils.GenerateSecret(6)
1446

    
1447
  # Decide whether to use IPv6
1448
  ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1449

    
1450
  # Create crypto key
1451
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1452
                                        constants.RIE_CERT_VALIDITY)
1453
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1454

    
1455
  (x509_key_name, x509_cert_pem) = result.payload
1456
  try:
1457
    # Load certificate
1458
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1459
                                                x509_cert_pem)
1460

    
1461
    # Sign certificate
1462
    signed_x509_cert_pem = \
1463
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1464

    
1465
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1466
                          len(instance.disks), pnode.primary_ip)
1467

    
1468
    ieloop = ImportExportLoop(lu)
1469
    try:
1470
      for idx, dev in enumerate(instance.disks):
1471
        magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1472

    
1473
        # Import daemon options
1474
        opts = objects.ImportExportOptions(key_name=x509_key_name,
1475
                                           ca_pem=source_ca_pem,
1476
                                           magic=magic, ipv6=ipv6)
1477

    
1478
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1479
                              "disk%d" % idx,
1480
                              constants.IEIO_SCRIPT, (dev, idx),
1481
                              timeouts, cbs, private=(idx, )))
1482

    
1483
      ieloop.Run()
1484
    finally:
1485
      ieloop.FinalizeAll()
1486
  finally:
1487
    # Remove crypto key and certificate
1488
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1489
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1490

    
1491
  return cbs.disk_results
1492

    
1493

    
1494
def _GetImportExportHandshakeMessage(version):
1495
  """Returns the handshake message for a RIE protocol version.
1496

1497
  @type version: number
1498

1499
  """
1500
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1501

    
1502

    
1503
def ComputeRemoteExportHandshake(cds):
1504
  """Computes the remote import/export handshake.
1505

1506
  @type cds: string
1507
  @param cds: Cluster domain secret
1508

1509
  """
1510
  salt = utils.GenerateSecret(8)
1511
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1512
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1513

    
1514

    
1515
def CheckRemoteExportHandshake(cds, handshake):
1516
  """Checks the handshake of a remote import/export.
1517

1518
  @type cds: string
1519
  @param cds: Cluster domain secret
1520
  @type handshake: sequence
1521
  @param handshake: Handshake sent by remote peer
1522

1523
  """
1524
  try:
1525
    (version, hmac_digest, hmac_salt) = handshake
1526
  except (TypeError, ValueError), err:
1527
    return "Invalid data: %s" % err
1528

    
1529
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1530
                              hmac_digest, salt=hmac_salt):
1531
    return "Hash didn't match, clusters don't share the same domain secret"
1532

    
1533
  if version != constants.RIE_VERSION:
1534
    return ("Clusters don't have the same remote import/export protocol"
1535
            " (local=%s, remote=%s)" %
1536
            (constants.RIE_VERSION, version))
1537

    
1538
  return None
1539

    
1540

    
1541
def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1542
  """Returns the hashed text for import/export disk information.
1543

1544
  @type disk_index: number
1545
  @param disk_index: Index of disk (included in hash)
1546
  @type host: string
1547
  @param host: Hostname
1548
  @type port: number
1549
  @param port: Daemon port
1550
  @type magic: string
1551
  @param magic: Magic value
1552

1553
  """
1554
  return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1555

    
1556

    
1557
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1558
  """Verifies received disk information for an export.
1559

1560
  @type cds: string
1561
  @param cds: Cluster domain secret
1562
  @type disk_index: number
1563
  @param disk_index: Index of disk (included in hash)
1564
  @type disk_info: sequence
1565
  @param disk_info: Disk information sent by remote peer
1566

1567
  """
1568
  try:
1569
    (host, port, magic, hmac_digest, hmac_salt) = disk_info
1570
  except (TypeError, ValueError), err:
1571
    raise errors.GenericError("Invalid data: %s" % err)
1572

    
1573
  if not (host and port and magic):
1574
    raise errors.GenericError("Missing destination host, port or magic")
1575

    
1576
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1577

    
1578
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1579
    raise errors.GenericError("HMAC is wrong")
1580

    
1581
  if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1582
    destination = host
1583
  else:
1584
    destination = netutils.Hostname.GetNormalizedName(host)
1585

    
1586
  return (destination,
1587
          utils.ValidateServiceName(port),
1588
          magic)
1589

    
1590

    
1591
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1592
  """Computes the signed disk information for a remote import.
1593

1594
  @type cds: string
1595
  @param cds: Cluster domain secret
1596
  @type salt: string
1597
  @param salt: HMAC salt
1598
  @type disk_index: number
1599
  @param disk_index: Index of disk (included in hash)
1600
  @type host: string
1601
  @param host: Hostname
1602
  @type port: number
1603
  @param port: Daemon port
1604
  @type magic: string
1605
  @param magic: Magic value
1606

1607
  """
1608
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1609
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1610
  return (host, port, magic, hmac_digest, salt)