Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ b8c160c1

History | View | Annotate | Download (44.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Instance-related functions and classes for masterd.
23

24
"""
25

    
26
import logging
27
import time
28
import OpenSSL
29

    
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import compat
33
from ganeti import utils
34
from ganeti import objects
35
from ganeti import netutils
36

    
37

    
38
class _ImportExportError(Exception):
39
  """Local exception to report import/export errors.
40

41
  """
42

    
43

    
44
class ImportExportTimeouts(object):
45
  #: Time until daemon starts writing status file
46
  DEFAULT_READY_TIMEOUT = 10
47

    
48
  #: Length of time until errors cause hard failure
49
  DEFAULT_ERROR_TIMEOUT = 10
50

    
51
  #: Time after which daemon must be listening
52
  DEFAULT_LISTEN_TIMEOUT = 10
53

    
54
  #: Progress update interval
55
  DEFAULT_PROGRESS_INTERVAL = 60
56

    
57
  __slots__ = [
58
    "error",
59
    "ready",
60
    "listen",
61
    "connect",
62
    "progress",
63
    ]
64

    
65
  def __init__(self, connect,
66
               listen=DEFAULT_LISTEN_TIMEOUT,
67
               error=DEFAULT_ERROR_TIMEOUT,
68
               ready=DEFAULT_READY_TIMEOUT,
69
               progress=DEFAULT_PROGRESS_INTERVAL):
70
    """Initializes this class.
71

72
    @type connect: number
73
    @param connect: Timeout for establishing connection
74
    @type listen: number
75
    @param listen: Timeout for starting to listen for connections
76
    @type error: number
77
    @param error: Length of time until errors cause hard failure
78
    @type ready: number
79
    @param ready: Timeout for daemon to become ready
80
    @type progress: number
81
    @param progress: Progress update interval
82

83
    """
84
    self.error = error
85
    self.ready = ready
86
    self.listen = listen
87
    self.connect = connect
88
    self.progress = progress
89

    
90

    
91
class ImportExportCbBase(object):
92
  """Callbacks for disk import/export.
93

94
  """
95
  def ReportListening(self, ie, private, component):
96
    """Called when daemon started listening.
97

98
    @type ie: Subclass of L{_DiskImportExportBase}
99
    @param ie: Import/export object
100
    @param private: Private data passed to import/export object
101
    @param component: transfer component name
102

103
    """
104

    
105
  def ReportConnected(self, ie, private):
106
    """Called when a connection has been established.
107

108
    @type ie: Subclass of L{_DiskImportExportBase}
109
    @param ie: Import/export object
110
    @param private: Private data passed to import/export object
111

112
    """
113

    
114
  def ReportProgress(self, ie, private):
115
    """Called when new progress information should be reported.
116

117
    @type ie: Subclass of L{_DiskImportExportBase}
118
    @param ie: Import/export object
119
    @param private: Private data passed to import/export object
120

121
    """
122

    
123
  def ReportFinished(self, ie, private):
124
    """Called when a transfer has finished.
125

126
    @type ie: Subclass of L{_DiskImportExportBase}
127
    @param ie: Import/export object
128
    @param private: Private data passed to import/export object
129

130
    """
131

    
132

    
133
class _DiskImportExportBase(object):
134
  MODE_TEXT = None
135

    
136
  def __init__(self, lu, node_name, opts,
137
               instance, component, timeouts, cbs, private=None):
138
    """Initializes this class.
139

140
    @param lu: Logical unit instance
141
    @type node_name: string
142
    @param node_name: Node name for import
143
    @type opts: L{objects.ImportExportOptions}
144
    @param opts: Import/export daemon options
145
    @type instance: L{objects.Instance}
146
    @param instance: Instance object
147
    @type component: string
148
    @param component: which part of the instance is being imported
149
    @type timeouts: L{ImportExportTimeouts}
150
    @param timeouts: Timeouts for this import
151
    @type cbs: L{ImportExportCbBase}
152
    @param cbs: Callbacks
153
    @param private: Private data for callback functions
154

155
    """
156
    assert self.MODE_TEXT
157

    
158
    self._lu = lu
159
    self.node_name = node_name
160
    self._opts = opts.Copy()
161
    self._instance = instance
162
    self._component = component
163
    self._timeouts = timeouts
164
    self._cbs = cbs
165
    self._private = private
166

    
167
    # Set master daemon's timeout in options for import/export daemon
168
    assert self._opts.connect_timeout is None
169
    self._opts.connect_timeout = timeouts.connect
170

    
171
    # Parent loop
172
    self._loop = None
173

    
174
    # Timestamps
175
    self._ts_begin = None
176
    self._ts_connected = None
177
    self._ts_finished = None
178
    self._ts_cleanup = None
179
    self._ts_last_progress = None
180
    self._ts_last_error = None
181

    
182
    # Transfer status
183
    self.success = None
184
    self.final_message = None
185

    
186
    # Daemon status
187
    self._daemon_name = None
188
    self._daemon = None
189

    
190
  @property
191
  def recent_output(self):
192
    """Returns the most recent output from the daemon.
193

194
    """
195
    if self._daemon:
196
      return "\n".join(self._daemon.recent_output)
197

    
198
    return None
199

    
200
  @property
201
  def progress(self):
202
    """Returns transfer progress information.
203

204
    """
205
    if not self._daemon:
206
      return None
207

    
208
    return (self._daemon.progress_mbytes,
209
            self._daemon.progress_throughput,
210
            self._daemon.progress_percent,
211
            self._daemon.progress_eta)
212

    
213
  @property
214
  def magic(self):
215
    """Returns the magic value for this import/export.
216

217
    """
218
    return self._opts.magic
219

    
220
  @property
221
  def active(self):
222
    """Determines whether this transport is still active.
223

224
    """
225
    return self.success is None
226

    
227
  @property
228
  def loop(self):
229
    """Returns parent loop.
230

231
    @rtype: L{ImportExportLoop}
232

233
    """
234
    return self._loop
235

    
236
  def SetLoop(self, loop):
237
    """Sets the parent loop.
238

239
    @type loop: L{ImportExportLoop}
240

241
    """
242
    if self._loop:
243
      raise errors.ProgrammerError("Loop can only be set once")
244

    
245
    self._loop = loop
246

    
247
  def _StartDaemon(self):
248
    """Starts the import/export daemon.
249

250
    """
251
    raise NotImplementedError()
252

    
253
  def CheckDaemon(self):
254
    """Checks whether daemon has been started and if not, starts it.
255

256
    @rtype: string
257
    @return: Daemon name
258

259
    """
260
    assert self._ts_cleanup is None
261

    
262
    if self._daemon_name is None:
263
      assert self._ts_begin is None
264

    
265
      result = self._StartDaemon()
266
      if result.fail_msg:
267
        raise _ImportExportError("Failed to start %s on %s: %s" %
268
                                 (self.MODE_TEXT, self.node_name,
269
                                  result.fail_msg))
270

    
271
      daemon_name = result.payload
272

    
273
      logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
274
                   self.node_name)
275

    
276
      self._ts_begin = time.time()
277
      self._daemon_name = daemon_name
278

    
279
    return self._daemon_name
280

    
281
  def GetDaemonName(self):
282
    """Returns the daemon name.
283

284
    """
285
    assert self._daemon_name, "Daemon has not been started"
286
    assert self._ts_cleanup is None
287
    return self._daemon_name
288

    
289
  def Abort(self):
290
    """Sends SIGTERM to import/export daemon (if still active).
291

292
    """
293
    if self._daemon_name:
294
      self._lu.LogWarning("Aborting %s '%s' on %s",
295
                          self.MODE_TEXT, self._daemon_name, self.node_name)
296
      result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
297
      if result.fail_msg:
298
        self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
299
                            self.MODE_TEXT, self._daemon_name,
300
                            self.node_name, result.fail_msg)
301
        return False
302

    
303
    return True
304

    
305
  def _SetDaemonData(self, data):
306
    """Internal function for updating status daemon data.
307

308
    @type data: L{objects.ImportExportStatus}
309
    @param data: Daemon status data
310

311
    """
312
    assert self._ts_begin is not None
313

    
314
    if not data:
315
      if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready):
316
        raise _ImportExportError("Didn't become ready after %s seconds" %
317
                                 self._timeouts.ready)
318

    
319
      return False
320

    
321
    self._daemon = data
322

    
323
    return True
324

    
325
  def SetDaemonData(self, success, data):
326
    """Updates daemon status data.
327

328
    @type success: bool
329
    @param success: Whether fetching data was successful or not
330
    @type data: L{objects.ImportExportStatus}
331
    @param data: Daemon status data
332

333
    """
334
    if not success:
335
      if self._ts_last_error is None:
336
        self._ts_last_error = time.time()
337

    
338
      elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error):
339
        raise _ImportExportError("Too many errors while updating data")
340

    
341
      return False
342

    
343
    self._ts_last_error = None
344

    
345
    return self._SetDaemonData(data)
346

    
347
  def CheckListening(self):
348
    """Checks whether the daemon is listening.
349

350
    """
351
    raise NotImplementedError()
352

    
353
  def _GetConnectedCheckEpoch(self):
354
    """Returns timeout to calculate connect timeout.
355

356
    """
357
    raise NotImplementedError()
358

    
359
  def CheckConnected(self):
360
    """Checks whether the daemon is connected.
361

362
    @rtype: bool
363
    @return: Whether the daemon is connected
364

365
    """
366
    assert self._daemon, "Daemon status missing"
367

    
368
    if self._ts_connected is not None:
369
      return True
370

    
371
    if self._daemon.connected:
372
      self._ts_connected = time.time()
373

    
374
      # TODO: Log remote peer
375
      logging.debug("%s '%s' on %s is now connected",
376
                    self.MODE_TEXT, self._daemon_name, self.node_name)
377

    
378
      self._cbs.ReportConnected(self, self._private)
379

    
380
      return True
381

    
382
    if utils.TimeoutExpired(self._GetConnectedCheckEpoch(),
383
                            self._timeouts.connect):
384
      raise _ImportExportError("Not connected after %s seconds" %
385
                               self._timeouts.connect)
386

    
387
    return False
388

    
389
  def _CheckProgress(self):
390
    """Checks whether a progress update should be reported.
391

392
    """
393
    if ((self._ts_last_progress is None or
394
        utils.TimeoutExpired(self._ts_last_progress,
395
                             self._timeouts.progress)) and
396
        self._daemon and
397
        self._daemon.progress_mbytes is not None and
398
        self._daemon.progress_throughput is not None):
399
      self._cbs.ReportProgress(self, self._private)
400
      self._ts_last_progress = time.time()
401

    
402
  def CheckFinished(self):
403
    """Checks whether the daemon exited.
404

405
    @rtype: bool
406
    @return: Whether the transfer is finished
407

408
    """
409
    assert self._daemon, "Daemon status missing"
410

    
411
    if self._ts_finished:
412
      return True
413

    
414
    if self._daemon.exit_status is None:
415
      # TODO: Adjust delay for ETA expiring soon
416
      self._CheckProgress()
417
      return False
418

    
419
    self._ts_finished = time.time()
420

    
421
    self._ReportFinished(self._daemon.exit_status == 0,
422
                         self._daemon.error_message)
423

    
424
    return True
425

    
426
  def _ReportFinished(self, success, message):
427
    """Transfer is finished or daemon exited.
428

429
    @type success: bool
430
    @param success: Whether the transfer was successful
431
    @type message: string
432
    @param message: Error message
433

434
    """
435
    assert self.success is None
436

    
437
    self.success = success
438
    self.final_message = message
439

    
440
    if success:
441
      logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
442
                   self._daemon_name, self.node_name)
443
    elif self._daemon_name:
444
      self._lu.LogWarning("%s '%s' on %s failed: %s",
445
                          self.MODE_TEXT, self._daemon_name, self.node_name,
446
                          message)
447
    else:
448
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
449
                          self.node_name, message)
450

    
451
    self._cbs.ReportFinished(self, self._private)
452

    
453
  def _Finalize(self):
454
    """Makes the RPC call to finalize this import/export.
455

456
    """
457
    return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
458

    
459
  def Finalize(self, error=None):
460
    """Finalizes this import/export.
461

462
    """
463
    if self._daemon_name:
464
      logging.info("Finalizing %s '%s' on %s",
465
                   self.MODE_TEXT, self._daemon_name, self.node_name)
466

    
467
      result = self._Finalize()
468
      if result.fail_msg:
469
        self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
470
                            self.MODE_TEXT, self._daemon_name,
471
                            self.node_name, result.fail_msg)
472
        return False
473

    
474
      # Daemon is no longer running
475
      self._daemon_name = None
476
      self._ts_cleanup = time.time()
477

    
478
    if error:
479
      self._ReportFinished(False, error)
480

    
481
    return True
482

    
483

    
484
class DiskImport(_DiskImportExportBase):
485
  MODE_TEXT = "import"
486

    
487
  def __init__(self, lu, node_name, opts, instance, component,
488
               dest, dest_args, timeouts, cbs, private=None):
489
    """Initializes this class.
490

491
    @param lu: Logical unit instance
492
    @type node_name: string
493
    @param node_name: Node name for import
494
    @type opts: L{objects.ImportExportOptions}
495
    @param opts: Import/export daemon options
496
    @type instance: L{objects.Instance}
497
    @param instance: Instance object
498
    @type component: string
499
    @param component: which part of the instance is being imported
500
    @param dest: I/O destination
501
    @param dest_args: I/O arguments
502
    @type timeouts: L{ImportExportTimeouts}
503
    @param timeouts: Timeouts for this import
504
    @type cbs: L{ImportExportCbBase}
505
    @param cbs: Callbacks
506
    @param private: Private data for callback functions
507

508
    """
509
    _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
510
                                   component, timeouts, cbs, private)
511
    self._dest = dest
512
    self._dest_args = dest_args
513

    
514
    # Timestamps
515
    self._ts_listening = None
516

    
517
  @property
518
  def listen_port(self):
519
    """Returns the port the daemon is listening on.
520

521
    """
522
    if self._daemon:
523
      return self._daemon.listen_port
524

    
525
    return None
526

    
527
  def _StartDaemon(self):
528
    """Starts the import daemon.
529

530
    """
531
    return self._lu.rpc.call_import_start(self.node_name, self._opts,
532
                                          self._instance, self._component,
533
                                          (self._dest, self._dest_args))
534

    
535
  def CheckListening(self):
536
    """Checks whether the daemon is listening.
537

538
    @rtype: bool
539
    @return: Whether the daemon is listening
540

541
    """
542
    assert self._daemon, "Daemon status missing"
543

    
544
    if self._ts_listening is not None:
545
      return True
546

    
547
    port = self._daemon.listen_port
548
    if port is not None:
549
      self._ts_listening = time.time()
550

    
551
      logging.debug("Import '%s' on %s is now listening on port %s",
552
                    self._daemon_name, self.node_name, port)
553

    
554
      self._cbs.ReportListening(self, self._private, self._component)
555

    
556
      return True
557

    
558
    if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen):
559
      raise _ImportExportError("Not listening after %s seconds" %
560
                               self._timeouts.listen)
561

    
562
    return False
563

    
564
  def _GetConnectedCheckEpoch(self):
565
    """Returns the time since we started listening.
566

567
    """
568
    assert self._ts_listening is not None, \
569
           ("Checking whether an import is connected is only useful"
570
            " once it's been listening")
571

    
572
    return self._ts_listening
573

    
574

    
575
class DiskExport(_DiskImportExportBase):
576
  MODE_TEXT = "export"
577

    
578
  def __init__(self, lu, node_name, opts, dest_host, dest_port,
579
               instance, component, source, source_args,
580
               timeouts, cbs, private=None):
581
    """Initializes this class.
582

583
    @param lu: Logical unit instance
584
    @type node_name: string
585
    @param node_name: Node name for import
586
    @type opts: L{objects.ImportExportOptions}
587
    @param opts: Import/export daemon options
588
    @type dest_host: string
589
    @param dest_host: Destination host name or IP address
590
    @type dest_port: number
591
    @param dest_port: Destination port number
592
    @type instance: L{objects.Instance}
593
    @param instance: Instance object
594
    @type component: string
595
    @param component: which part of the instance is being imported
596
    @param source: I/O source
597
    @param source_args: I/O source
598
    @type timeouts: L{ImportExportTimeouts}
599
    @param timeouts: Timeouts for this import
600
    @type cbs: L{ImportExportCbBase}
601
    @param cbs: Callbacks
602
    @param private: Private data for callback functions
603

604
    """
605
    _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
606
                                   component, timeouts, cbs, private)
607
    self._dest_host = dest_host
608
    self._dest_port = dest_port
609
    self._source = source
610
    self._source_args = source_args
611

    
612
  def _StartDaemon(self):
613
    """Starts the export daemon.
614

615
    """
616
    return self._lu.rpc.call_export_start(self.node_name, self._opts,
617
                                          self._dest_host, self._dest_port,
618
                                          self._instance, self._component,
619
                                          (self._source, self._source_args))
620

    
621
  def CheckListening(self):
622
    """Checks whether the daemon is listening.
623

624
    """
625
    # Only an import can be listening
626
    return True
627

    
628
  def _GetConnectedCheckEpoch(self):
629
    """Returns the time since the daemon started.
630

631
    """
632
    assert self._ts_begin is not None
633

    
634
    return self._ts_begin
635

    
636

    
637
def FormatProgress(progress):
638
  """Formats progress information for user consumption
639

640
  """
641
  (mbytes, throughput, percent, eta) = progress
642

    
643
  parts = [
644
    utils.FormatUnit(mbytes, "h"),
645

    
646
    # Not using FormatUnit as it doesn't support kilobytes
647
    "%0.1f MiB/s" % throughput,
648
    ]
649

    
650
  if percent is not None:
651
    parts.append("%d%%" % percent)
652

    
653
  if eta is not None:
654
    parts.append("ETA %s" % utils.FormatSeconds(eta))
655

    
656
  return utils.CommaJoin(parts)
657

    
658

    
659
class ImportExportLoop:
660
  MIN_DELAY = 1.0
661
  MAX_DELAY = 20.0
662

    
663
  def __init__(self, lu):
664
    """Initializes this class.
665

666
    """
667
    self._lu = lu
668
    self._queue = []
669
    self._pending_add = []
670

    
671
  def Add(self, diskie):
672
    """Adds an import/export object to the loop.
673

674
    @type diskie: Subclass of L{_DiskImportExportBase}
675
    @param diskie: Import/export object
676

677
    """
678
    assert diskie not in self._pending_add
679
    assert diskie.loop is None
680

    
681
    diskie.SetLoop(self)
682

    
683
    # Adding new objects to a staging list is necessary, otherwise the main
684
    # loop gets confused if callbacks modify the queue while the main loop is
685
    # iterating over it.
686
    self._pending_add.append(diskie)
687

    
688
  @staticmethod
689
  def _CollectDaemonStatus(lu, daemons):
690
    """Collects the status for all import/export daemons.
691

692
    """
693
    daemon_status = {}
694

    
695
    for node_name, names in daemons.iteritems():
696
      result = lu.rpc.call_impexp_status(node_name, names)
697
      if result.fail_msg:
698
        lu.LogWarning("Failed to get daemon status on %s: %s",
699
                      node_name, result.fail_msg)
700
        continue
701

    
702
      assert len(names) == len(result.payload)
703

    
704
      daemon_status[node_name] = dict(zip(names, result.payload))
705

    
706
    return daemon_status
707

    
708
  @staticmethod
709
  def _GetActiveDaemonNames(queue):
710
    """Gets the names of all active daemons.
711

712
    """
713
    result = {}
714
    for diskie in queue:
715
      if not diskie.active:
716
        continue
717

    
718
      try:
719
        # Start daemon if necessary
720
        daemon_name = diskie.CheckDaemon()
721
      except _ImportExportError, err:
722
        logging.exception("%s failed", diskie.MODE_TEXT)
723
        diskie.Finalize(error=str(err))
724
        continue
725

    
726
      result.setdefault(diskie.node_name, []).append(daemon_name)
727

    
728
    assert len(queue) >= len(result)
729
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
730

    
731
    logging.debug("daemons=%r", result)
732

    
733
    return result
734

    
735
  def _AddPendingToQueue(self):
736
    """Adds all pending import/export objects to the internal queue.
737

738
    """
739
    assert compat.all(diskie not in self._queue and diskie.loop == self
740
                      for diskie in self._pending_add)
741

    
742
    self._queue.extend(self._pending_add)
743

    
744
    del self._pending_add[:]
745

    
746
  def Run(self):
747
    """Utility main loop.
748

749
    """
750
    while True:
751
      self._AddPendingToQueue()
752

    
753
      # Collect all active daemon names
754
      daemons = self._GetActiveDaemonNames(self._queue)
755
      if not daemons:
756
        break
757

    
758
      # Collection daemon status data
759
      data = self._CollectDaemonStatus(self._lu, daemons)
760

    
761
      # Use data
762
      delay = self.MAX_DELAY
763
      for diskie in self._queue:
764
        if not diskie.active:
765
          continue
766

    
767
        try:
768
          try:
769
            all_daemon_data = data[diskie.node_name]
770
          except KeyError:
771
            result = diskie.SetDaemonData(False, None)
772
          else:
773
            result = \
774
              diskie.SetDaemonData(True,
775
                                   all_daemon_data[diskie.GetDaemonName()])
776

    
777
          if not result:
778
            # Daemon not yet ready, retry soon
779
            delay = min(3.0, delay)
780
            continue
781

    
782
          if diskie.CheckFinished():
783
            # Transfer finished
784
            diskie.Finalize()
785
            continue
786

    
787
          # Normal case: check again in 5 seconds
788
          delay = min(5.0, delay)
789

    
790
          if not diskie.CheckListening():
791
            # Not yet listening, retry soon
792
            delay = min(1.0, delay)
793
            continue
794

    
795
          if not diskie.CheckConnected():
796
            # Not yet connected, retry soon
797
            delay = min(1.0, delay)
798
            continue
799

    
800
        except _ImportExportError, err:
801
          logging.exception("%s failed", diskie.MODE_TEXT)
802
          diskie.Finalize(error=str(err))
803

    
804
      if not compat.any(diskie.active for diskie in self._queue):
805
        break
806

    
807
      # Wait a bit
808
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
809
      logging.debug("Waiting for %ss", delay)
810
      time.sleep(delay)
811

    
812
  def FinalizeAll(self):
813
    """Finalizes all pending transfers.
814

815
    """
816
    success = True
817

    
818
    for diskie in self._queue:
819
      success = diskie.Finalize() and success
820

    
821
    return success
822

    
823

    
824
class _TransferInstCbBase(ImportExportCbBase):
825
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
826
               dest_node, dest_ip):
827
    """Initializes this class.
828

829
    """
830
    ImportExportCbBase.__init__(self)
831

    
832
    self.lu = lu
833
    self.feedback_fn = feedback_fn
834
    self.instance = instance
835
    self.timeouts = timeouts
836
    self.src_node = src_node
837
    self.src_cbs = src_cbs
838
    self.dest_node = dest_node
839
    self.dest_ip = dest_ip
840

    
841

    
842
class _TransferInstSourceCb(_TransferInstCbBase):
843
  def ReportConnected(self, ie, dtp):
844
    """Called when a connection has been established.
845

846
    """
847
    assert self.src_cbs is None
848
    assert dtp.src_export == ie
849
    assert dtp.dest_import
850

    
851
    self.feedback_fn("%s is sending data on %s" %
852
                     (dtp.data.name, ie.node_name))
853

    
854
  def ReportProgress(self, ie, dtp):
855
    """Called when new progress information should be reported.
856

857
    """
858
    progress = ie.progress
859
    if not progress:
860
      return
861

    
862
    self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
863

    
864
  def ReportFinished(self, ie, dtp):
865
    """Called when a transfer has finished.
866

867
    """
868
    assert self.src_cbs is None
869
    assert dtp.src_export == ie
870
    assert dtp.dest_import
871

    
872
    if ie.success:
873
      self.feedback_fn("%s finished sending data" % dtp.data.name)
874
    else:
875
      self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
876
                       (dtp.data.name, ie.final_message, ie.recent_output))
877

    
878
    dtp.RecordResult(ie.success)
879

    
880
    cb = dtp.data.finished_fn
881
    if cb:
882
      cb()
883

    
884
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
885
    # give the daemon a moment to sort things out
886
    if dtp.dest_import and not ie.success:
887
      dtp.dest_import.Abort()
888

    
889

    
890
class _TransferInstDestCb(_TransferInstCbBase):
891
  def ReportListening(self, ie, dtp, component):
892
    """Called when daemon started listening.
893

894
    """
895
    assert self.src_cbs
896
    assert dtp.src_export is None
897
    assert dtp.dest_import
898
    assert dtp.export_opts
899

    
900
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
901

    
902
    # Start export on source node
903
    de = DiskExport(self.lu, self.src_node, dtp.export_opts,
904
                    self.dest_ip, ie.listen_port, self.instance,
905
                    component, dtp.data.src_io, dtp.data.src_ioargs,
906
                    self.timeouts, self.src_cbs, private=dtp)
907
    ie.loop.Add(de)
908

    
909
    dtp.src_export = de
910

    
911
  def ReportConnected(self, ie, dtp):
912
    """Called when a connection has been established.
913

914
    """
915
    self.feedback_fn("%s is receiving data on %s" %
916
                     (dtp.data.name, self.dest_node))
917

    
918
  def ReportFinished(self, ie, dtp):
919
    """Called when a transfer has finished.
920

921
    """
922
    if ie.success:
923
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
924
    else:
925
      self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
926
                       (dtp.data.name, ie.final_message, ie.recent_output))
927

    
928
    dtp.RecordResult(ie.success)
929

    
930
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
931
    # give the daemon a moment to sort things out
932
    if dtp.src_export and not ie.success:
933
      dtp.src_export.Abort()
934

    
935

    
936
class DiskTransfer(object):
937
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
938
               finished_fn):
939
    """Initializes this class.
940

941
    @type name: string
942
    @param name: User-visible name for this transfer (e.g. "disk/0")
943
    @param src_io: Source I/O type
944
    @param src_ioargs: Source I/O arguments
945
    @param dest_io: Destination I/O type
946
    @param dest_ioargs: Destination I/O arguments
947
    @type finished_fn: callable
948
    @param finished_fn: Function called once transfer has finished
949

950
    """
951
    self.name = name
952

    
953
    self.src_io = src_io
954
    self.src_ioargs = src_ioargs
955

    
956
    self.dest_io = dest_io
957
    self.dest_ioargs = dest_ioargs
958

    
959
    self.finished_fn = finished_fn
960

    
961

    
962
class _DiskTransferPrivate(object):
963
  def __init__(self, data, success, export_opts):
964
    """Initializes this class.
965

966
    @type data: L{DiskTransfer}
967
    @type success: bool
968

969
    """
970
    self.data = data
971
    self.success = success
972
    self.export_opts = export_opts
973

    
974
    self.src_export = None
975
    self.dest_import = None
976

    
977
  def RecordResult(self, success):
978
    """Updates the status.
979

980
    One failed part will cause the whole transfer to fail.
981

982
    """
983
    self.success = self.success and success
984

    
985

    
986
def _GetInstDiskMagic(base, instance_name, index):
987
  """Computes the magic value for a disk export or import.
988

989
  @type base: string
990
  @param base: Random seed value (can be the same for all disks of a transfer)
991
  @type instance_name: string
992
  @param instance_name: Name of instance
993
  @type index: number
994
  @param index: Disk index
995

996
  """
997
  h = compat.sha1_hash()
998
  h.update(str(constants.RIE_VERSION))
999
  h.update(base)
1000
  h.update(instance_name)
1001
  h.update(str(index))
1002
  return h.hexdigest()
1003

    
1004

    
1005
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
1006
                         instance, all_transfers):
1007
  """Transfers an instance's data from one node to another.
1008

1009
  @param lu: Logical unit instance
1010
  @param feedback_fn: Feedback function
1011
  @type src_node: string
1012
  @param src_node: Source node name
1013
  @type dest_node: string
1014
  @param dest_node: Destination node name
1015
  @type dest_ip: string
1016
  @param dest_ip: IP address of destination node
1017
  @type instance: L{objects.Instance}
1018
  @param instance: Instance object
1019
  @type all_transfers: list of L{DiskTransfer} instances
1020
  @param all_transfers: List of all disk transfers to be made
1021
  @rtype: list
1022
  @return: List with a boolean (True=successful, False=failed) for success for
1023
           each transfer
1024

1025
  """
1026
  # Disable compression for all moves as these are all within the same cluster
1027
  compress = constants.IEC_NONE
1028

    
1029
  logging.debug("Source node %s, destination node %s, compression '%s'",
1030
                src_node, dest_node, compress)
1031

    
1032
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1033
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1034
                                  src_node, None, dest_node, dest_ip)
1035
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1036
                                 src_node, src_cbs, dest_node, dest_ip)
1037

    
1038
  all_dtp = []
1039

    
1040
  base_magic = utils.GenerateSecret(6)
1041

    
1042
  ieloop = ImportExportLoop(lu)
1043
  try:
1044
    for idx, transfer in enumerate(all_transfers):
1045
      if transfer:
1046
        feedback_fn("Exporting %s from %s to %s" %
1047
                    (transfer.name, src_node, dest_node))
1048

    
1049
        magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1050
        opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1051
                                           compress=compress, magic=magic)
1052

    
1053
        dtp = _DiskTransferPrivate(transfer, True, opts)
1054

    
1055
        di = DiskImport(lu, dest_node, opts, instance, "disk%d" % idx,
1056
                        transfer.dest_io, transfer.dest_ioargs,
1057
                        timeouts, dest_cbs, private=dtp)
1058
        ieloop.Add(di)
1059

    
1060
        dtp.dest_import = di
1061
      else:
1062
        dtp = _DiskTransferPrivate(None, False, None)
1063

    
1064
      all_dtp.append(dtp)
1065

    
1066
    ieloop.Run()
1067
  finally:
1068
    ieloop.FinalizeAll()
1069

    
1070
  assert len(all_dtp) == len(all_transfers)
1071
  assert compat.all((dtp.src_export is None or
1072
                      dtp.src_export.success is not None) and
1073
                     (dtp.dest_import is None or
1074
                      dtp.dest_import.success is not None)
1075
                     for dtp in all_dtp), \
1076
         "Not all imports/exports are finalized"
1077

    
1078
  return [bool(dtp.success) for dtp in all_dtp]
1079

    
1080

    
1081
class _RemoteExportCb(ImportExportCbBase):
1082
  def __init__(self, feedback_fn, disk_count):
1083
    """Initializes this class.
1084

1085
    """
1086
    ImportExportCbBase.__init__(self)
1087
    self._feedback_fn = feedback_fn
1088
    self._dresults = [None] * disk_count
1089

    
1090
  @property
1091
  def disk_results(self):
1092
    """Returns per-disk results.
1093

1094
    """
1095
    return self._dresults
1096

    
1097
  def ReportConnected(self, ie, private):
1098
    """Called when a connection has been established.
1099

1100
    """
1101
    (idx, _) = private
1102

    
1103
    self._feedback_fn("Disk %s is now sending data" % idx)
1104

    
1105
  def ReportProgress(self, ie, private):
1106
    """Called when new progress information should be reported.
1107

1108
    """
1109
    (idx, _) = private
1110

    
1111
    progress = ie.progress
1112
    if not progress:
1113
      return
1114

    
1115
    self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1116

    
1117
  def ReportFinished(self, ie, private):
1118
    """Called when a transfer has finished.
1119

1120
    """
1121
    (idx, finished_fn) = private
1122

    
1123
    if ie.success:
1124
      self._feedback_fn("Disk %s finished sending data" % idx)
1125
    else:
1126
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1127
                        (idx, ie.final_message, ie.recent_output))
1128

    
1129
    self._dresults[idx] = bool(ie.success)
1130

    
1131
    if finished_fn:
1132
      finished_fn()
1133

    
1134

    
1135
class ExportInstanceHelper:
1136
  def __init__(self, lu, feedback_fn, instance):
1137
    """Initializes this class.
1138

1139
    @param lu: Logical unit instance
1140
    @param feedback_fn: Feedback function
1141
    @type instance: L{objects.Instance}
1142
    @param instance: Instance object
1143

1144
    """
1145
    self._lu = lu
1146
    self._feedback_fn = feedback_fn
1147
    self._instance = instance
1148

    
1149
    self._snap_disks = []
1150
    self._removed_snaps = [False] * len(instance.disks)
1151

    
1152
  def CreateSnapshots(self):
1153
    """Creates an LVM snapshot for every disk of the instance.
1154

1155
    """
1156
    assert not self._snap_disks
1157

    
1158
    instance = self._instance
1159
    src_node = instance.primary_node
1160

    
1161
    for idx, disk in enumerate(instance.disks):
1162
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1163
                        (idx, src_node))
1164

    
1165
      # result.payload will be a snapshot of an lvm leaf of the one we
1166
      # passed
1167
      result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1168
      new_dev = False
1169
      msg = result.fail_msg
1170
      if msg:
1171
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1172
                            idx, src_node, msg)
1173
      elif (not isinstance(result.payload, (tuple, list)) or
1174
            len(result.payload) != 2):
1175
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1176
                            " result '%s'", idx, src_node, result.payload)
1177
      else:
1178
        disk_id = tuple(result.payload)
1179
        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1180
                               logical_id=disk_id, physical_id=disk_id,
1181
                               iv_name=disk.iv_name)
1182

    
1183
      self._snap_disks.append(new_dev)
1184

    
1185
    assert len(self._snap_disks) == len(instance.disks)
1186
    assert len(self._removed_snaps) == len(instance.disks)
1187

    
1188
  def _RemoveSnapshot(self, disk_index):
1189
    """Removes an LVM snapshot.
1190

1191
    @type disk_index: number
1192
    @param disk_index: Index of the snapshot to be removed
1193

1194
    """
1195
    disk = self._snap_disks[disk_index]
1196
    if disk and not self._removed_snaps[disk_index]:
1197
      src_node = self._instance.primary_node
1198

    
1199
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1200
                        (disk_index, src_node))
1201

    
1202
      result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1203
      if result.fail_msg:
1204
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1205
                            " %s: %s", disk_index, src_node, result.fail_msg)
1206
      else:
1207
        self._removed_snaps[disk_index] = True
1208

    
1209
  def LocalExport(self, dest_node):
1210
    """Intra-cluster instance export.
1211

1212
    @type dest_node: L{objects.Node}
1213
    @param dest_node: Destination node
1214

1215
    """
1216
    instance = self._instance
1217
    src_node = instance.primary_node
1218

    
1219
    assert len(self._snap_disks) == len(instance.disks)
1220

    
1221
    transfers = []
1222

    
1223
    for idx, dev in enumerate(self._snap_disks):
1224
      if not dev:
1225
        transfers.append(None)
1226
        continue
1227

    
1228
      path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1229
                            dev.physical_id[1])
1230

    
1231
      finished_fn = compat.partial(self._TransferFinished, idx)
1232

    
1233
      # FIXME: pass debug option from opcode to backend
1234
      dt = DiskTransfer("snapshot/%s" % idx,
1235
                        constants.IEIO_SCRIPT, (dev, idx),
1236
                        constants.IEIO_FILE, (path, ),
1237
                        finished_fn)
1238
      transfers.append(dt)
1239

    
1240
    # Actually export data
1241
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1242
                                    src_node, dest_node.name,
1243
                                    dest_node.secondary_ip,
1244
                                    instance, transfers)
1245

    
1246
    assert len(dresults) == len(instance.disks)
1247

    
1248
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1249
    result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1250
                                               self._snap_disks)
1251
    msg = result.fail_msg
1252
    fin_resu = not msg
1253
    if msg:
1254
      self._lu.LogWarning("Could not finalize export for instance %s"
1255
                          " on node %s: %s", instance.name, dest_node.name, msg)
1256

    
1257
    return (fin_resu, dresults)
1258

    
1259
  def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1260
    """Inter-cluster instance export.
1261

1262
    @type disk_info: list
1263
    @param disk_info: Per-disk destination information
1264
    @type key_name: string
1265
    @param key_name: Name of X509 key to use
1266
    @type dest_ca_pem: string
1267
    @param dest_ca_pem: Destination X509 CA in PEM format
1268
    @type timeouts: L{ImportExportTimeouts}
1269
    @param timeouts: Timeouts for this import
1270

1271
    """
1272
    instance = self._instance
1273

    
1274
    assert len(disk_info) == len(instance.disks)
1275

    
1276
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1277

    
1278
    ieloop = ImportExportLoop(self._lu)
1279
    try:
1280
      for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1281
                                                           disk_info)):
1282
        # Decide whether to use IPv6
1283
        ipv6 = netutils.IP6Address.IsValid(host)
1284

    
1285
        opts = objects.ImportExportOptions(key_name=key_name,
1286
                                           ca_pem=dest_ca_pem,
1287
                                           magic=magic, ipv6=ipv6)
1288

    
1289
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1290
        finished_fn = compat.partial(self._TransferFinished, idx)
1291
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1292
                              opts, host, port, instance, "disk%d" % idx,
1293
                              constants.IEIO_SCRIPT, (dev, idx),
1294
                              timeouts, cbs, private=(idx, finished_fn)))
1295

    
1296
      ieloop.Run()
1297
    finally:
1298
      ieloop.FinalizeAll()
1299

    
1300
    return (True, cbs.disk_results)
1301

    
1302
  def _TransferFinished(self, idx):
1303
    """Called once a transfer has finished.
1304

1305
    @type idx: number
1306
    @param idx: Disk index
1307

1308
    """
1309
    logging.debug("Transfer %s finished", idx)
1310
    self._RemoveSnapshot(idx)
1311

    
1312
  def Cleanup(self):
1313
    """Remove all snapshots.
1314

1315
    """
1316
    assert len(self._removed_snaps) == len(self._instance.disks)
1317
    for idx in range(len(self._instance.disks)):
1318
      self._RemoveSnapshot(idx)
1319

    
1320

    
1321
class _RemoteImportCb(ImportExportCbBase):
1322
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1323
               external_address):
1324
    """Initializes this class.
1325

1326
    @type cds: string
1327
    @param cds: Cluster domain secret
1328
    @type x509_cert_pem: string
1329
    @param x509_cert_pem: CA used for signing import key
1330
    @type disk_count: number
1331
    @param disk_count: Number of disks
1332
    @type external_address: string
1333
    @param external_address: External address of destination node
1334

1335
    """
1336
    ImportExportCbBase.__init__(self)
1337
    self._feedback_fn = feedback_fn
1338
    self._cds = cds
1339
    self._x509_cert_pem = x509_cert_pem
1340
    self._disk_count = disk_count
1341
    self._external_address = external_address
1342

    
1343
    self._dresults = [None] * disk_count
1344
    self._daemon_port = [None] * disk_count
1345

    
1346
    self._salt = utils.GenerateSecret(8)
1347

    
1348
  @property
1349
  def disk_results(self):
1350
    """Returns per-disk results.
1351

1352
    """
1353
    return self._dresults
1354

    
1355
  def _CheckAllListening(self):
1356
    """Checks whether all daemons are listening.
1357

1358
    If all daemons are listening, the information is sent to the client.
1359

1360
    """
1361
    if not compat.all(dp is not None for dp in self._daemon_port):
1362
      return
1363

    
1364
    host = self._external_address
1365

    
1366
    disks = []
1367
    for idx, (port, magic) in enumerate(self._daemon_port):
1368
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1369
                                               idx, host, port, magic))
1370

    
1371
    assert len(disks) == self._disk_count
1372

    
1373
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1374
      "disks": disks,
1375
      "x509_ca": self._x509_cert_pem,
1376
      })
1377

    
1378
  def ReportListening(self, ie, private, _):
1379
    """Called when daemon started listening.
1380

1381
    """
1382
    (idx, ) = private
1383

    
1384
    self._feedback_fn("Disk %s is now listening" % idx)
1385

    
1386
    assert self._daemon_port[idx] is None
1387

    
1388
    self._daemon_port[idx] = (ie.listen_port, ie.magic)
1389

    
1390
    self._CheckAllListening()
1391

    
1392
  def ReportConnected(self, ie, private):
1393
    """Called when a connection has been established.
1394

1395
    """
1396
    (idx, ) = private
1397

    
1398
    self._feedback_fn("Disk %s is now receiving data" % idx)
1399

    
1400
  def ReportFinished(self, ie, private):
1401
    """Called when a transfer has finished.
1402

1403
    """
1404
    (idx, ) = private
1405

    
1406
    # Daemon is certainly no longer listening
1407
    self._daemon_port[idx] = None
1408

    
1409
    if ie.success:
1410
      self._feedback_fn("Disk %s finished receiving data" % idx)
1411
    else:
1412
      self._feedback_fn(("Disk %s failed to receive data: %s"
1413
                         " (recent output: %s)") %
1414
                        (idx, ie.final_message, ie.recent_output))
1415

    
1416
    self._dresults[idx] = bool(ie.success)
1417

    
1418

    
1419
def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1420
                 cds, timeouts):
1421
  """Imports an instance from another cluster.
1422

1423
  @param lu: Logical unit instance
1424
  @param feedback_fn: Feedback function
1425
  @type instance: L{objects.Instance}
1426
  @param instance: Instance object
1427
  @type pnode: L{objects.Node}
1428
  @param pnode: Primary node of instance as an object
1429
  @type source_x509_ca: OpenSSL.crypto.X509
1430
  @param source_x509_ca: Import source's X509 CA
1431
  @type cds: string
1432
  @param cds: Cluster domain secret
1433
  @type timeouts: L{ImportExportTimeouts}
1434
  @param timeouts: Timeouts for this import
1435

1436
  """
1437
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1438
                                                  source_x509_ca)
1439

    
1440
  magic_base = utils.GenerateSecret(6)
1441

    
1442
  # Decide whether to use IPv6
1443
  ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1444

    
1445
  # Create crypto key
1446
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1447
                                        constants.RIE_CERT_VALIDITY)
1448
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1449

    
1450
  (x509_key_name, x509_cert_pem) = result.payload
1451
  try:
1452
    # Load certificate
1453
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1454
                                                x509_cert_pem)
1455

    
1456
    # Sign certificate
1457
    signed_x509_cert_pem = \
1458
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1459

    
1460
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1461
                          len(instance.disks), pnode.primary_ip)
1462

    
1463
    ieloop = ImportExportLoop(lu)
1464
    try:
1465
      for idx, dev in enumerate(instance.disks):
1466
        magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1467

    
1468
        # Import daemon options
1469
        opts = objects.ImportExportOptions(key_name=x509_key_name,
1470
                                           ca_pem=source_ca_pem,
1471
                                           magic=magic, ipv6=ipv6)
1472

    
1473
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1474
                              "disk%d" % idx,
1475
                              constants.IEIO_SCRIPT, (dev, idx),
1476
                              timeouts, cbs, private=(idx, )))
1477

    
1478
      ieloop.Run()
1479
    finally:
1480
      ieloop.FinalizeAll()
1481
  finally:
1482
    # Remove crypto key and certificate
1483
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1484
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1485

    
1486
  return cbs.disk_results
1487

    
1488

    
1489
def _GetImportExportHandshakeMessage(version):
1490
  """Returns the handshake message for a RIE protocol version.
1491

1492
  @type version: number
1493

1494
  """
1495
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1496

    
1497

    
1498
def ComputeRemoteExportHandshake(cds):
1499
  """Computes the remote import/export handshake.
1500

1501
  @type cds: string
1502
  @param cds: Cluster domain secret
1503

1504
  """
1505
  salt = utils.GenerateSecret(8)
1506
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1507
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1508

    
1509

    
1510
def CheckRemoteExportHandshake(cds, handshake):
1511
  """Checks the handshake of a remote import/export.
1512

1513
  @type cds: string
1514
  @param cds: Cluster domain secret
1515
  @type handshake: sequence
1516
  @param handshake: Handshake sent by remote peer
1517

1518
  """
1519
  try:
1520
    (version, hmac_digest, hmac_salt) = handshake
1521
  except (TypeError, ValueError), err:
1522
    return "Invalid data: %s" % err
1523

    
1524
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1525
                              hmac_digest, salt=hmac_salt):
1526
    return "Hash didn't match, clusters don't share the same domain secret"
1527

    
1528
  if version != constants.RIE_VERSION:
1529
    return ("Clusters don't have the same remote import/export protocol"
1530
            " (local=%s, remote=%s)" %
1531
            (constants.RIE_VERSION, version))
1532

    
1533
  return None
1534

    
1535

    
1536
def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1537
  """Returns the hashed text for import/export disk information.
1538

1539
  @type disk_index: number
1540
  @param disk_index: Index of disk (included in hash)
1541
  @type host: string
1542
  @param host: Hostname
1543
  @type port: number
1544
  @param port: Daemon port
1545
  @type magic: string
1546
  @param magic: Magic value
1547

1548
  """
1549
  return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1550

    
1551

    
1552
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1553
  """Verifies received disk information for an export.
1554

1555
  @type cds: string
1556
  @param cds: Cluster domain secret
1557
  @type disk_index: number
1558
  @param disk_index: Index of disk (included in hash)
1559
  @type disk_info: sequence
1560
  @param disk_info: Disk information sent by remote peer
1561

1562
  """
1563
  try:
1564
    (host, port, magic, hmac_digest, hmac_salt) = disk_info
1565
  except (TypeError, ValueError), err:
1566
    raise errors.GenericError("Invalid data: %s" % err)
1567

    
1568
  if not (host and port and magic):
1569
    raise errors.GenericError("Missing destination host, port or magic")
1570

    
1571
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1572

    
1573
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1574
    raise errors.GenericError("HMAC is wrong")
1575

    
1576
  if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1577
    destination = host
1578
  else:
1579
    destination = netutils.Hostname.GetNormalizedName(host)
1580

    
1581
  return (destination,
1582
          utils.ValidateServiceName(port),
1583
          magic)
1584

    
1585

    
1586
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1587
  """Computes the signed disk information for a remote import.
1588

1589
  @type cds: string
1590
  @param cds: Cluster domain secret
1591
  @type salt: string
1592
  @param salt: HMAC salt
1593
  @type disk_index: number
1594
  @param disk_index: Index of disk (included in hash)
1595
  @type host: string
1596
  @param host: Hostname
1597
  @type port: number
1598
  @param port: Daemon port
1599
  @type magic: string
1600
  @param magic: Magic value
1601

1602
  """
1603
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1604
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1605
  return (host, port, magic, hmac_digest, salt)