Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ bc5d0215

History | View | Annotate | Download (44.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Instance-related functions and classes for masterd.
23

24
"""
25

    
26
import logging
27
import time
28
import OpenSSL
29

    
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import compat
33
from ganeti import utils
34
from ganeti import objects
35
from ganeti import netutils
36

    
37

    
38
class _ImportExportError(Exception):
39
  """Local exception to report import/export errors.
40

41
  """
42

    
43

    
44
class ImportExportTimeouts(object):
45
  #: Time until daemon starts writing status file
46
  DEFAULT_READY_TIMEOUT = 10
47

    
48
  #: Length of time until errors cause hard failure
49
  DEFAULT_ERROR_TIMEOUT = 10
50

    
51
  #: Time after which daemon must be listening
52
  DEFAULT_LISTEN_TIMEOUT = 10
53

    
54
  #: Progress update interval
55
  DEFAULT_PROGRESS_INTERVAL = 60
56

    
57
  __slots__ = [
58
    "error",
59
    "ready",
60
    "listen",
61
    "connect",
62
    "progress",
63
    ]
64

    
65
  def __init__(self, connect,
66
               listen=DEFAULT_LISTEN_TIMEOUT,
67
               error=DEFAULT_ERROR_TIMEOUT,
68
               ready=DEFAULT_READY_TIMEOUT,
69
               progress=DEFAULT_PROGRESS_INTERVAL):
70
    """Initializes this class.
71

72
    @type connect: number
73
    @param connect: Timeout for establishing connection
74
    @type listen: number
75
    @param listen: Timeout for starting to listen for connections
76
    @type error: number
77
    @param error: Length of time until errors cause hard failure
78
    @type ready: number
79
    @param ready: Timeout for daemon to become ready
80
    @type progress: number
81
    @param progress: Progress update interval
82

83
    """
84
    self.error = error
85
    self.ready = ready
86
    self.listen = listen
87
    self.connect = connect
88
    self.progress = progress
89

    
90

    
91
class ImportExportCbBase(object):
92
  """Callbacks for disk import/export.
93

94
  """
95
  def ReportListening(self, ie, private, component):
96
    """Called when daemon started listening.
97

98
    @type ie: Subclass of L{_DiskImportExportBase}
99
    @param ie: Import/export object
100
    @param private: Private data passed to import/export object
101
    @param component: transfer component name
102

103
    """
104

    
105
  def ReportConnected(self, ie, private):
106
    """Called when a connection has been established.
107

108
    @type ie: Subclass of L{_DiskImportExportBase}
109
    @param ie: Import/export object
110
    @param private: Private data passed to import/export object
111

112
    """
113

    
114
  def ReportProgress(self, ie, private):
115
    """Called when new progress information should be reported.
116

117
    @type ie: Subclass of L{_DiskImportExportBase}
118
    @param ie: Import/export object
119
    @param private: Private data passed to import/export object
120

121
    """
122

    
123
  def ReportFinished(self, ie, private):
124
    """Called when a transfer has finished.
125

126
    @type ie: Subclass of L{_DiskImportExportBase}
127
    @param ie: Import/export object
128
    @param private: Private data passed to import/export object
129

130
    """
131

    
132

    
133
class _DiskImportExportBase(object):
134
  MODE_TEXT = None
135

    
136
  def __init__(self, lu, node_name, opts,
137
               instance, component, timeouts, cbs, private=None):
138
    """Initializes this class.
139

140
    @param lu: Logical unit instance
141
    @type node_name: string
142
    @param node_name: Node name for import
143
    @type opts: L{objects.ImportExportOptions}
144
    @param opts: Import/export daemon options
145
    @type instance: L{objects.Instance}
146
    @param instance: Instance object
147
    @type component: string
148
    @param component: which part of the instance is being imported
149
    @type timeouts: L{ImportExportTimeouts}
150
    @param timeouts: Timeouts for this import
151
    @type cbs: L{ImportExportCbBase}
152
    @param cbs: Callbacks
153
    @param private: Private data for callback functions
154

155
    """
156
    assert self.MODE_TEXT
157

    
158
    self._lu = lu
159
    self.node_name = node_name
160
    self._opts = opts.Copy()
161
    self._instance = instance
162
    self._component = component
163
    self._timeouts = timeouts
164
    self._cbs = cbs
165
    self._private = private
166

    
167
    # Set master daemon's timeout in options for import/export daemon
168
    assert self._opts.connect_timeout is None
169
    self._opts.connect_timeout = timeouts.connect
170

    
171
    # Parent loop
172
    self._loop = None
173

    
174
    # Timestamps
175
    self._ts_begin = None
176
    self._ts_connected = None
177
    self._ts_finished = None
178
    self._ts_cleanup = None
179
    self._ts_last_progress = None
180
    self._ts_last_error = None
181

    
182
    # Transfer status
183
    self.success = None
184
    self.final_message = None
185

    
186
    # Daemon status
187
    self._daemon_name = None
188
    self._daemon = None
189

    
190
  @property
191
  def recent_output(self):
192
    """Returns the most recent output from the daemon.
193

194
    """
195
    if self._daemon:
196
      return "\n".join(self._daemon.recent_output)
197

    
198
    return None
199

    
200
  @property
201
  def progress(self):
202
    """Returns transfer progress information.
203

204
    """
205
    if not self._daemon:
206
      return None
207

    
208
    return (self._daemon.progress_mbytes,
209
            self._daemon.progress_throughput,
210
            self._daemon.progress_percent,
211
            self._daemon.progress_eta)
212

    
213
  @property
214
  def magic(self):
215
    """Returns the magic value for this import/export.
216

217
    """
218
    return self._opts.magic
219

    
220
  @property
221
  def active(self):
222
    """Determines whether this transport is still active.
223

224
    """
225
    return self.success is None
226

    
227
  @property
228
  def loop(self):
229
    """Returns parent loop.
230

231
    @rtype: L{ImportExportLoop}
232

233
    """
234
    return self._loop
235

    
236
  def SetLoop(self, loop):
237
    """Sets the parent loop.
238

239
    @type loop: L{ImportExportLoop}
240

241
    """
242
    if self._loop:
243
      raise errors.ProgrammerError("Loop can only be set once")
244

    
245
    self._loop = loop
246

    
247
  def _StartDaemon(self):
248
    """Starts the import/export daemon.
249

250
    """
251
    raise NotImplementedError()
252

    
253
  def CheckDaemon(self):
254
    """Checks whether daemon has been started and if not, starts it.
255

256
    @rtype: string
257
    @return: Daemon name
258

259
    """
260
    assert self._ts_cleanup is None
261

    
262
    if self._daemon_name is None:
263
      assert self._ts_begin is None
264

    
265
      result = self._StartDaemon()
266
      if result.fail_msg:
267
        raise _ImportExportError("Failed to start %s on %s: %s" %
268
                                 (self.MODE_TEXT, self.node_name,
269
                                  result.fail_msg))
270

    
271
      daemon_name = result.payload
272

    
273
      logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
274
                   self.node_name)
275

    
276
      self._ts_begin = time.time()
277
      self._daemon_name = daemon_name
278

    
279
    return self._daemon_name
280

    
281
  def GetDaemonName(self):
282
    """Returns the daemon name.
283

284
    """
285
    assert self._daemon_name, "Daemon has not been started"
286
    assert self._ts_cleanup is None
287
    return self._daemon_name
288

    
289
  def Abort(self):
290
    """Sends SIGTERM to import/export daemon (if still active).
291

292
    """
293
    if self._daemon_name:
294
      self._lu.LogWarning("Aborting %s '%s' on %s",
295
                          self.MODE_TEXT, self._daemon_name, self.node_name)
296
      result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
297
      if result.fail_msg:
298
        self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
299
                            self.MODE_TEXT, self._daemon_name,
300
                            self.node_name, result.fail_msg)
301
        return False
302

    
303
    return True
304

    
305
  def _SetDaemonData(self, data):
306
    """Internal function for updating status daemon data.
307

308
    @type data: L{objects.ImportExportStatus}
309
    @param data: Daemon status data
310

311
    """
312
    assert self._ts_begin is not None
313

    
314
    if not data:
315
      if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready):
316
        raise _ImportExportError("Didn't become ready after %s seconds" %
317
                                 self._timeouts.ready)
318

    
319
      return False
320

    
321
    self._daemon = data
322

    
323
    return True
324

    
325
  def SetDaemonData(self, success, data):
326
    """Updates daemon status data.
327

328
    @type success: bool
329
    @param success: Whether fetching data was successful or not
330
    @type data: L{objects.ImportExportStatus}
331
    @param data: Daemon status data
332

333
    """
334
    if not success:
335
      if self._ts_last_error is None:
336
        self._ts_last_error = time.time()
337

    
338
      elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error):
339
        raise _ImportExportError("Too many errors while updating data")
340

    
341
      return False
342

    
343
    self._ts_last_error = None
344

    
345
    return self._SetDaemonData(data)
346

    
347
  def CheckListening(self):
348
    """Checks whether the daemon is listening.
349

350
    """
351
    raise NotImplementedError()
352

    
353
  def _GetConnectedCheckEpoch(self):
354
    """Returns timeout to calculate connect timeout.
355

356
    """
357
    raise NotImplementedError()
358

    
359
  def CheckConnected(self):
360
    """Checks whether the daemon is connected.
361

362
    @rtype: bool
363
    @return: Whether the daemon is connected
364

365
    """
366
    assert self._daemon, "Daemon status missing"
367

    
368
    if self._ts_connected is not None:
369
      return True
370

    
371
    if self._daemon.connected:
372
      self._ts_connected = time.time()
373

    
374
      # TODO: Log remote peer
375
      logging.debug("%s '%s' on %s is now connected",
376
                    self.MODE_TEXT, self._daemon_name, self.node_name)
377

    
378
      self._cbs.ReportConnected(self, self._private)
379

    
380
      return True
381

    
382
    if utils.TimeoutExpired(self._GetConnectedCheckEpoch(),
383
                            self._timeouts.connect):
384
      raise _ImportExportError("Not connected after %s seconds" %
385
                               self._timeouts.connect)
386

    
387
    return False
388

    
389
  def _CheckProgress(self):
390
    """Checks whether a progress update should be reported.
391

392
    """
393
    if ((self._ts_last_progress is None or
394
        utils.TimeoutExpired(self._ts_last_progress,
395
                             self._timeouts.progress)) and
396
        self._daemon and
397
        self._daemon.progress_mbytes is not None and
398
        self._daemon.progress_throughput is not None):
399
      self._cbs.ReportProgress(self, self._private)
400
      self._ts_last_progress = time.time()
401

    
402
  def CheckFinished(self):
403
    """Checks whether the daemon exited.
404

405
    @rtype: bool
406
    @return: Whether the transfer is finished
407

408
    """
409
    assert self._daemon, "Daemon status missing"
410

    
411
    if self._ts_finished:
412
      return True
413

    
414
    if self._daemon.exit_status is None:
415
      # TODO: Adjust delay for ETA expiring soon
416
      self._CheckProgress()
417
      return False
418

    
419
    self._ts_finished = time.time()
420

    
421
    self._ReportFinished(self._daemon.exit_status == 0,
422
                         self._daemon.error_message)
423

    
424
    return True
425

    
426
  def _ReportFinished(self, success, message):
427
    """Transfer is finished or daemon exited.
428

429
    @type success: bool
430
    @param success: Whether the transfer was successful
431
    @type message: string
432
    @param message: Error message
433

434
    """
435
    assert self.success is None
436

    
437
    self.success = success
438
    self.final_message = message
439

    
440
    if success:
441
      logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
442
                   self._daemon_name, self.node_name)
443
    elif self._daemon_name:
444
      self._lu.LogWarning("%s '%s' on %s failed: %s",
445
                          self.MODE_TEXT, self._daemon_name, self.node_name,
446
                          message)
447
    else:
448
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
449
                          self.node_name, message)
450

    
451
    self._cbs.ReportFinished(self, self._private)
452

    
453
  def _Finalize(self):
454
    """Makes the RPC call to finalize this import/export.
455

456
    """
457
    return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
458

    
459
  def Finalize(self, error=None):
460
    """Finalizes this import/export.
461

462
    """
463
    if self._daemon_name:
464
      logging.info("Finalizing %s '%s' on %s",
465
                   self.MODE_TEXT, self._daemon_name, self.node_name)
466

    
467
      result = self._Finalize()
468
      if result.fail_msg:
469
        self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
470
                            self.MODE_TEXT, self._daemon_name,
471
                            self.node_name, result.fail_msg)
472
        return False
473

    
474
      # Daemon is no longer running
475
      self._daemon_name = None
476
      self._ts_cleanup = time.time()
477

    
478
    if error:
479
      self._ReportFinished(False, error)
480

    
481
    return True
482

    
483

    
484
class DiskImport(_DiskImportExportBase):
485
  MODE_TEXT = "import"
486

    
487
  def __init__(self, lu, node_name, opts, instance, component,
488
               dest, dest_args, timeouts, cbs, private=None):
489
    """Initializes this class.
490

491
    @param lu: Logical unit instance
492
    @type node_name: string
493
    @param node_name: Node name for import
494
    @type opts: L{objects.ImportExportOptions}
495
    @param opts: Import/export daemon options
496
    @type instance: L{objects.Instance}
497
    @param instance: Instance object
498
    @type component: string
499
    @param component: which part of the instance is being imported
500
    @param dest: I/O destination
501
    @param dest_args: I/O arguments
502
    @type timeouts: L{ImportExportTimeouts}
503
    @param timeouts: Timeouts for this import
504
    @type cbs: L{ImportExportCbBase}
505
    @param cbs: Callbacks
506
    @param private: Private data for callback functions
507

508
    """
509
    _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
510
                                   component, timeouts, cbs, private)
511
    self._dest = dest
512
    self._dest_args = dest_args
513

    
514
    # Timestamps
515
    self._ts_listening = None
516

    
517
  @property
518
  def listen_port(self):
519
    """Returns the port the daemon is listening on.
520

521
    """
522
    if self._daemon:
523
      return self._daemon.listen_port
524

    
525
    return None
526

    
527
  def _StartDaemon(self):
528
    """Starts the import daemon.
529

530
    """
531
    return self._lu.rpc.call_import_start(self.node_name, self._opts,
532
                                          self._instance, self._component,
533
                                          (self._dest, self._dest_args))
534

    
535
  def CheckListening(self):
536
    """Checks whether the daemon is listening.
537

538
    @rtype: bool
539
    @return: Whether the daemon is listening
540

541
    """
542
    assert self._daemon, "Daemon status missing"
543

    
544
    if self._ts_listening is not None:
545
      return True
546

    
547
    port = self._daemon.listen_port
548
    if port is not None:
549
      self._ts_listening = time.time()
550

    
551
      logging.debug("Import '%s' on %s is now listening on port %s",
552
                    self._daemon_name, self.node_name, port)
553

    
554
      self._cbs.ReportListening(self, self._private, self._component)
555

    
556
      return True
557

    
558
    if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen):
559
      raise _ImportExportError("Not listening after %s seconds" %
560
                               self._timeouts.listen)
561

    
562
    return False
563

    
564
  def _GetConnectedCheckEpoch(self):
565
    """Returns the time since we started listening.
566

567
    """
568
    assert self._ts_listening is not None, \
569
           ("Checking whether an import is connected is only useful"
570
            " once it's been listening")
571

    
572
    return self._ts_listening
573

    
574

    
575
class DiskExport(_DiskImportExportBase):
576
  MODE_TEXT = "export"
577

    
578
  def __init__(self, lu, node_name, opts, dest_host, dest_port,
579
               instance, component, source, source_args,
580
               timeouts, cbs, private=None):
581
    """Initializes this class.
582

583
    @param lu: Logical unit instance
584
    @type node_name: string
585
    @param node_name: Node name for import
586
    @type opts: L{objects.ImportExportOptions}
587
    @param opts: Import/export daemon options
588
    @type dest_host: string
589
    @param dest_host: Destination host name or IP address
590
    @type dest_port: number
591
    @param dest_port: Destination port number
592
    @type instance: L{objects.Instance}
593
    @param instance: Instance object
594
    @type component: string
595
    @param component: which part of the instance is being imported
596
    @param source: I/O source
597
    @param source_args: I/O source
598
    @type timeouts: L{ImportExportTimeouts}
599
    @param timeouts: Timeouts for this import
600
    @type cbs: L{ImportExportCbBase}
601
    @param cbs: Callbacks
602
    @param private: Private data for callback functions
603

604
    """
605
    _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
606
                                   component, timeouts, cbs, private)
607
    self._dest_host = dest_host
608
    self._dest_port = dest_port
609
    self._source = source
610
    self._source_args = source_args
611

    
612
  def _StartDaemon(self):
613
    """Starts the export daemon.
614

615
    """
616
    return self._lu.rpc.call_export_start(self.node_name, self._opts,
617
                                          self._dest_host, self._dest_port,
618
                                          self._instance, self._component,
619
                                          (self._source, self._source_args))
620

    
621
  def CheckListening(self):
622
    """Checks whether the daemon is listening.
623

624
    """
625
    # Only an import can be listening
626
    return True
627

    
628
  def _GetConnectedCheckEpoch(self):
629
    """Returns the time since the daemon started.
630

631
    """
632
    assert self._ts_begin is not None
633

    
634
    return self._ts_begin
635

    
636

    
637
def FormatProgress(progress):
638
  """Formats progress information for user consumption
639

640
  """
641
  (mbytes, throughput, percent, eta) = progress
642

    
643
  parts = [
644
    utils.FormatUnit(mbytes, "h"),
645

    
646
    # Not using FormatUnit as it doesn't support kilobytes
647
    "%0.1f MiB/s" % throughput,
648
    ]
649

    
650
  if percent is not None:
651
    parts.append("%d%%" % percent)
652

    
653
  if eta is not None:
654
    parts.append("ETA %s" % utils.FormatSeconds(eta))
655

    
656
  return utils.CommaJoin(parts)
657

    
658

    
659
class ImportExportLoop:
660
  MIN_DELAY = 1.0
661
  MAX_DELAY = 20.0
662

    
663
  def __init__(self, lu):
664
    """Initializes this class.
665

666
    """
667
    self._lu = lu
668
    self._queue = []
669
    self._pending_add = []
670

    
671
  def Add(self, diskie):
672
    """Adds an import/export object to the loop.
673

674
    @type diskie: Subclass of L{_DiskImportExportBase}
675
    @param diskie: Import/export object
676

677
    """
678
    assert diskie not in self._pending_add
679
    assert diskie.loop is None
680

    
681
    diskie.SetLoop(self)
682

    
683
    # Adding new objects to a staging list is necessary, otherwise the main
684
    # loop gets confused if callbacks modify the queue while the main loop is
685
    # iterating over it.
686
    self._pending_add.append(diskie)
687

    
688
  @staticmethod
689
  def _CollectDaemonStatus(lu, daemons):
690
    """Collects the status for all import/export daemons.
691

692
    """
693
    daemon_status = {}
694

    
695
    for node_name, names in daemons.iteritems():
696
      result = lu.rpc.call_impexp_status(node_name, names)
697
      if result.fail_msg:
698
        lu.LogWarning("Failed to get daemon status on %s: %s",
699
                      node_name, result.fail_msg)
700
        continue
701

    
702
      assert len(names) == len(result.payload)
703

    
704
      daemon_status[node_name] = dict(zip(names, result.payload))
705

    
706
    return daemon_status
707

    
708
  @staticmethod
709
  def _GetActiveDaemonNames(queue):
710
    """Gets the names of all active daemons.
711

712
    """
713
    result = {}
714
    for diskie in queue:
715
      if not diskie.active:
716
        continue
717

    
718
      try:
719
        # Start daemon if necessary
720
        daemon_name = diskie.CheckDaemon()
721
      except _ImportExportError, err:
722
        logging.exception("%s failed", diskie.MODE_TEXT)
723
        diskie.Finalize(error=str(err))
724
        continue
725

    
726
      result.setdefault(diskie.node_name, []).append(daemon_name)
727

    
728
    assert len(queue) >= len(result)
729
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
730

    
731
    logging.debug("daemons=%r", result)
732

    
733
    return result
734

    
735
  def _AddPendingToQueue(self):
736
    """Adds all pending import/export objects to the internal queue.
737

738
    """
739
    assert compat.all(diskie not in self._queue and diskie.loop == self
740
                      for diskie in self._pending_add)
741

    
742
    self._queue.extend(self._pending_add)
743

    
744
    del self._pending_add[:]
745

    
746
  def Run(self):
747
    """Utility main loop.
748

749
    """
750
    while True:
751
      self._AddPendingToQueue()
752

    
753
      # Collect all active daemon names
754
      daemons = self._GetActiveDaemonNames(self._queue)
755
      if not daemons:
756
        break
757

    
758
      # Collection daemon status data
759
      data = self._CollectDaemonStatus(self._lu, daemons)
760

    
761
      # Use data
762
      delay = self.MAX_DELAY
763
      for diskie in self._queue:
764
        if not diskie.active:
765
          continue
766

    
767
        try:
768
          try:
769
            all_daemon_data = data[diskie.node_name]
770
          except KeyError:
771
            result = diskie.SetDaemonData(False, None)
772
          else:
773
            result = \
774
              diskie.SetDaemonData(True,
775
                                   all_daemon_data[diskie.GetDaemonName()])
776

    
777
          if not result:
778
            # Daemon not yet ready, retry soon
779
            delay = min(3.0, delay)
780
            continue
781

    
782
          if diskie.CheckFinished():
783
            # Transfer finished
784
            diskie.Finalize()
785
            continue
786

    
787
          # Normal case: check again in 5 seconds
788
          delay = min(5.0, delay)
789

    
790
          if not diskie.CheckListening():
791
            # Not yet listening, retry soon
792
            delay = min(1.0, delay)
793
            continue
794

    
795
          if not diskie.CheckConnected():
796
            # Not yet connected, retry soon
797
            delay = min(1.0, delay)
798
            continue
799

    
800
        except _ImportExportError, err:
801
          logging.exception("%s failed", diskie.MODE_TEXT)
802
          diskie.Finalize(error=str(err))
803

    
804
      if not compat.any(diskie.active for diskie in self._queue):
805
        break
806

    
807
      # Wait a bit
808
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
809
      logging.debug("Waiting for %ss", delay)
810
      time.sleep(delay)
811

    
812
  def FinalizeAll(self):
813
    """Finalizes all pending transfers.
814

815
    """
816
    success = True
817

    
818
    for diskie in self._queue:
819
      success = diskie.Finalize() and success
820

    
821
    return success
822

    
823

    
824
class _TransferInstCbBase(ImportExportCbBase):
825
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
826
               dest_node, dest_ip):
827
    """Initializes this class.
828

829
    """
830
    ImportExportCbBase.__init__(self)
831

    
832
    self.lu = lu
833
    self.feedback_fn = feedback_fn
834
    self.instance = instance
835
    self.timeouts = timeouts
836
    self.src_node = src_node
837
    self.src_cbs = src_cbs
838
    self.dest_node = dest_node
839
    self.dest_ip = dest_ip
840

    
841

    
842
class _TransferInstSourceCb(_TransferInstCbBase):
843
  def ReportConnected(self, ie, dtp):
844
    """Called when a connection has been established.
845

846
    """
847
    assert self.src_cbs is None
848
    assert dtp.src_export == ie
849
    assert dtp.dest_import
850

    
851
    self.feedback_fn("%s is sending data on %s" %
852
                     (dtp.data.name, ie.node_name))
853

    
854
  def ReportProgress(self, ie, dtp):
855
    """Called when new progress information should be reported.
856

857
    """
858
    progress = ie.progress
859
    if not progress:
860
      return
861

    
862
    self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
863

    
864
  def ReportFinished(self, ie, dtp):
865
    """Called when a transfer has finished.
866

867
    """
868
    assert self.src_cbs is None
869
    assert dtp.src_export == ie
870
    assert dtp.dest_import
871

    
872
    if ie.success:
873
      self.feedback_fn("%s finished sending data" % dtp.data.name)
874
    else:
875
      self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
876
                       (dtp.data.name, ie.final_message, ie.recent_output))
877

    
878
    dtp.RecordResult(ie.success)
879

    
880
    cb = dtp.data.finished_fn
881
    if cb:
882
      cb()
883

    
884
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
885
    # give the daemon a moment to sort things out
886
    if dtp.dest_import and not ie.success:
887
      dtp.dest_import.Abort()
888

    
889

    
890
class _TransferInstDestCb(_TransferInstCbBase):
891
  def ReportListening(self, ie, dtp, component):
892
    """Called when daemon started listening.
893

894
    """
895
    assert self.src_cbs
896
    assert dtp.src_export is None
897
    assert dtp.dest_import
898
    assert dtp.export_opts
899

    
900
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
901

    
902
    # Start export on source node
903
    de = DiskExport(self.lu, self.src_node, dtp.export_opts,
904
                    self.dest_ip, ie.listen_port, self.instance,
905
                    component, dtp.data.src_io, dtp.data.src_ioargs,
906
                    self.timeouts, self.src_cbs, private=dtp)
907
    ie.loop.Add(de)
908

    
909
    dtp.src_export = de
910

    
911
  def ReportConnected(self, ie, dtp):
912
    """Called when a connection has been established.
913

914
    """
915
    self.feedback_fn("%s is receiving data on %s" %
916
                     (dtp.data.name, self.dest_node))
917

    
918
  def ReportFinished(self, ie, dtp):
919
    """Called when a transfer has finished.
920

921
    """
922
    if ie.success:
923
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
924
    else:
925
      self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
926
                       (dtp.data.name, ie.final_message, ie.recent_output))
927

    
928
    dtp.RecordResult(ie.success)
929

    
930
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
931
    # give the daemon a moment to sort things out
932
    if dtp.src_export and not ie.success:
933
      dtp.src_export.Abort()
934

    
935

    
936
class DiskTransfer(object):
937
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
938
               finished_fn):
939
    """Initializes this class.
940

941
    @type name: string
942
    @param name: User-visible name for this transfer (e.g. "disk/0")
943
    @param src_io: Source I/O type
944
    @param src_ioargs: Source I/O arguments
945
    @param dest_io: Destination I/O type
946
    @param dest_ioargs: Destination I/O arguments
947
    @type finished_fn: callable
948
    @param finished_fn: Function called once transfer has finished
949

950
    """
951
    self.name = name
952

    
953
    self.src_io = src_io
954
    self.src_ioargs = src_ioargs
955

    
956
    self.dest_io = dest_io
957
    self.dest_ioargs = dest_ioargs
958

    
959
    self.finished_fn = finished_fn
960

    
961

    
962
class _DiskTransferPrivate(object):
963
  def __init__(self, data, success, export_opts):
964
    """Initializes this class.
965

966
    @type data: L{DiskTransfer}
967
    @type success: bool
968

969
    """
970
    self.data = data
971
    self.success = success
972
    self.export_opts = export_opts
973

    
974
    self.src_export = None
975
    self.dest_import = None
976

    
977
  def RecordResult(self, success):
978
    """Updates the status.
979

980
    One failed part will cause the whole transfer to fail.
981

982
    """
983
    self.success = self.success and success
984

    
985

    
986
def _GetInstDiskMagic(base, instance_name, index):
987
  """Computes the magic value for a disk export or import.
988

989
  @type base: string
990
  @param base: Random seed value (can be the same for all disks of a transfer)
991
  @type instance_name: string
992
  @param instance_name: Name of instance
993
  @type index: number
994
  @param index: Disk index
995

996
  """
997
  h = compat.sha1_hash()
998
  h.update(str(constants.RIE_VERSION))
999
  h.update(base)
1000
  h.update(instance_name)
1001
  h.update(str(index))
1002
  return h.hexdigest()
1003

    
1004

    
1005
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
1006
                         instance, all_transfers):
1007
  """Transfers an instance's data from one node to another.
1008

1009
  @param lu: Logical unit instance
1010
  @param feedback_fn: Feedback function
1011
  @type src_node: string
1012
  @param src_node: Source node name
1013
  @type dest_node: string
1014
  @param dest_node: Destination node name
1015
  @type dest_ip: string
1016
  @param dest_ip: IP address of destination node
1017
  @type instance: L{objects.Instance}
1018
  @param instance: Instance object
1019
  @type all_transfers: list of L{DiskTransfer} instances
1020
  @param all_transfers: List of all disk transfers to be made
1021
  @rtype: list
1022
  @return: List with a boolean (True=successful, False=failed) for success for
1023
           each transfer
1024

1025
  """
1026
  # Disable compression for all moves as these are all within the same cluster
1027
  compress = constants.IEC_NONE
1028

    
1029
  logging.debug("Source node %s, destination node %s, compression '%s'",
1030
                src_node, dest_node, compress)
1031

    
1032
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1033
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1034
                                  src_node, None, dest_node, dest_ip)
1035
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1036
                                 src_node, src_cbs, dest_node, dest_ip)
1037

    
1038
  all_dtp = []
1039

    
1040
  base_magic = utils.GenerateSecret(6)
1041

    
1042
  ieloop = ImportExportLoop(lu)
1043
  try:
1044
    for idx, transfer in enumerate(all_transfers):
1045
      if transfer:
1046
        feedback_fn("Exporting %s from %s to %s" %
1047
                    (transfer.name, src_node, dest_node))
1048

    
1049
        magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1050
        opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1051
                                           compress=compress, magic=magic)
1052

    
1053
        dtp = _DiskTransferPrivate(transfer, True, opts)
1054

    
1055
        di = DiskImport(lu, dest_node, opts, instance, "disk%d" % idx,
1056
                        transfer.dest_io, transfer.dest_ioargs,
1057
                        timeouts, dest_cbs, private=dtp)
1058
        ieloop.Add(di)
1059

    
1060
        dtp.dest_import = di
1061
      else:
1062
        dtp = _DiskTransferPrivate(None, False, None)
1063

    
1064
      all_dtp.append(dtp)
1065

    
1066
    ieloop.Run()
1067
  finally:
1068
    ieloop.FinalizeAll()
1069

    
1070
  assert len(all_dtp) == len(all_transfers)
1071
  assert compat.all((dtp.src_export is None or
1072
                      dtp.src_export.success is not None) and
1073
                     (dtp.dest_import is None or
1074
                      dtp.dest_import.success is not None)
1075
                     for dtp in all_dtp), \
1076
         "Not all imports/exports are finalized"
1077

    
1078
  return [bool(dtp.success) for dtp in all_dtp]
1079

    
1080

    
1081
class _RemoteExportCb(ImportExportCbBase):
1082
  def __init__(self, feedback_fn, disk_count):
1083
    """Initializes this class.
1084

1085
    """
1086
    ImportExportCbBase.__init__(self)
1087
    self._feedback_fn = feedback_fn
1088
    self._dresults = [None] * disk_count
1089

    
1090
  @property
1091
  def disk_results(self):
1092
    """Returns per-disk results.
1093

1094
    """
1095
    return self._dresults
1096

    
1097
  def ReportConnected(self, ie, private):
1098
    """Called when a connection has been established.
1099

1100
    """
1101
    (idx, _) = private
1102

    
1103
    self._feedback_fn("Disk %s is now sending data" % idx)
1104

    
1105
  def ReportProgress(self, ie, private):
1106
    """Called when new progress information should be reported.
1107

1108
    """
1109
    (idx, _) = private
1110

    
1111
    progress = ie.progress
1112
    if not progress:
1113
      return
1114

    
1115
    self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1116

    
1117
  def ReportFinished(self, ie, private):
1118
    """Called when a transfer has finished.
1119

1120
    """
1121
    (idx, finished_fn) = private
1122

    
1123
    if ie.success:
1124
      self._feedback_fn("Disk %s finished sending data" % idx)
1125
    else:
1126
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1127
                        (idx, ie.final_message, ie.recent_output))
1128

    
1129
    self._dresults[idx] = bool(ie.success)
1130

    
1131
    if finished_fn:
1132
      finished_fn()
1133

    
1134

    
1135
class ExportInstanceHelper:
1136
  def __init__(self, lu, feedback_fn, instance):
1137
    """Initializes this class.
1138

1139
    @param lu: Logical unit instance
1140
    @param feedback_fn: Feedback function
1141
    @type instance: L{objects.Instance}
1142
    @param instance: Instance object
1143

1144
    """
1145
    self._lu = lu
1146
    self._feedback_fn = feedback_fn
1147
    self._instance = instance
1148

    
1149
    self._snap_disks = []
1150
    self._removed_snaps = [False] * len(instance.disks)
1151

    
1152
  def CreateSnapshots(self):
1153
    """Creates an LVM snapshot for every disk of the instance.
1154

1155
    """
1156
    assert not self._snap_disks
1157

    
1158
    instance = self._instance
1159
    src_node = instance.primary_node
1160

    
1161
    for idx, disk in enumerate(instance.disks):
1162
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1163
                        (idx, src_node))
1164

    
1165
      # result.payload will be a snapshot of an lvm leaf of the one we
1166
      # passed
1167
      result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1168
      new_dev = False
1169
      msg = result.fail_msg
1170
      if msg:
1171
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1172
                            idx, src_node, msg)
1173
      elif (not isinstance(result.payload, (tuple, list)) or
1174
            len(result.payload) != 2):
1175
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1176
                            " result '%s'", idx, src_node, result.payload)
1177
      else:
1178
        disk_id = tuple(result.payload)
1179
        disk_params = constants.DISK_LD_DEFAULTS[constants.LD_LV].copy()
1180
        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1181
                               logical_id=disk_id, physical_id=disk_id,
1182
                               iv_name=disk.iv_name,
1183
                               params=disk_params)
1184

    
1185
      self._snap_disks.append(new_dev)
1186

    
1187
    assert len(self._snap_disks) == len(instance.disks)
1188
    assert len(self._removed_snaps) == len(instance.disks)
1189

    
1190
  def _RemoveSnapshot(self, disk_index):
1191
    """Removes an LVM snapshot.
1192

1193
    @type disk_index: number
1194
    @param disk_index: Index of the snapshot to be removed
1195

1196
    """
1197
    disk = self._snap_disks[disk_index]
1198
    if disk and not self._removed_snaps[disk_index]:
1199
      src_node = self._instance.primary_node
1200

    
1201
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1202
                        (disk_index, src_node))
1203

    
1204
      result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1205
      if result.fail_msg:
1206
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1207
                            " %s: %s", disk_index, src_node, result.fail_msg)
1208
      else:
1209
        self._removed_snaps[disk_index] = True
1210

    
1211
  def LocalExport(self, dest_node):
1212
    """Intra-cluster instance export.
1213

1214
    @type dest_node: L{objects.Node}
1215
    @param dest_node: Destination node
1216

1217
    """
1218
    instance = self._instance
1219
    src_node = instance.primary_node
1220

    
1221
    assert len(self._snap_disks) == len(instance.disks)
1222

    
1223
    transfers = []
1224

    
1225
    for idx, dev in enumerate(self._snap_disks):
1226
      if not dev:
1227
        transfers.append(None)
1228
        continue
1229

    
1230
      path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1231
                            dev.physical_id[1])
1232

    
1233
      finished_fn = compat.partial(self._TransferFinished, idx)
1234

    
1235
      # FIXME: pass debug option from opcode to backend
1236
      dt = DiskTransfer("snapshot/%s" % idx,
1237
                        constants.IEIO_SCRIPT, (dev, idx),
1238
                        constants.IEIO_FILE, (path, ),
1239
                        finished_fn)
1240
      transfers.append(dt)
1241

    
1242
    # Actually export data
1243
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1244
                                    src_node, dest_node.name,
1245
                                    dest_node.secondary_ip,
1246
                                    instance, transfers)
1247

    
1248
    assert len(dresults) == len(instance.disks)
1249

    
1250
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1251
    result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1252
                                               self._snap_disks)
1253
    msg = result.fail_msg
1254
    fin_resu = not msg
1255
    if msg:
1256
      self._lu.LogWarning("Could not finalize export for instance %s"
1257
                          " on node %s: %s", instance.name, dest_node.name, msg)
1258

    
1259
    return (fin_resu, dresults)
1260

    
1261
  def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1262
    """Inter-cluster instance export.
1263

1264
    @type disk_info: list
1265
    @param disk_info: Per-disk destination information
1266
    @type key_name: string
1267
    @param key_name: Name of X509 key to use
1268
    @type dest_ca_pem: string
1269
    @param dest_ca_pem: Destination X509 CA in PEM format
1270
    @type timeouts: L{ImportExportTimeouts}
1271
    @param timeouts: Timeouts for this import
1272

1273
    """
1274
    instance = self._instance
1275

    
1276
    assert len(disk_info) == len(instance.disks)
1277

    
1278
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1279

    
1280
    ieloop = ImportExportLoop(self._lu)
1281
    try:
1282
      for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1283
                                                           disk_info)):
1284
        # Decide whether to use IPv6
1285
        ipv6 = netutils.IP6Address.IsValid(host)
1286

    
1287
        opts = objects.ImportExportOptions(key_name=key_name,
1288
                                           ca_pem=dest_ca_pem,
1289
                                           magic=magic, ipv6=ipv6)
1290

    
1291
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1292
        finished_fn = compat.partial(self._TransferFinished, idx)
1293
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1294
                              opts, host, port, instance, "disk%d" % idx,
1295
                              constants.IEIO_SCRIPT, (dev, idx),
1296
                              timeouts, cbs, private=(idx, finished_fn)))
1297

    
1298
      ieloop.Run()
1299
    finally:
1300
      ieloop.FinalizeAll()
1301

    
1302
    return (True, cbs.disk_results)
1303

    
1304
  def _TransferFinished(self, idx):
1305
    """Called once a transfer has finished.
1306

1307
    @type idx: number
1308
    @param idx: Disk index
1309

1310
    """
1311
    logging.debug("Transfer %s finished", idx)
1312
    self._RemoveSnapshot(idx)
1313

    
1314
  def Cleanup(self):
1315
    """Remove all snapshots.
1316

1317
    """
1318
    assert len(self._removed_snaps) == len(self._instance.disks)
1319
    for idx in range(len(self._instance.disks)):
1320
      self._RemoveSnapshot(idx)
1321

    
1322

    
1323
class _RemoteImportCb(ImportExportCbBase):
1324
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1325
               external_address):
1326
    """Initializes this class.
1327

1328
    @type cds: string
1329
    @param cds: Cluster domain secret
1330
    @type x509_cert_pem: string
1331
    @param x509_cert_pem: CA used for signing import key
1332
    @type disk_count: number
1333
    @param disk_count: Number of disks
1334
    @type external_address: string
1335
    @param external_address: External address of destination node
1336

1337
    """
1338
    ImportExportCbBase.__init__(self)
1339
    self._feedback_fn = feedback_fn
1340
    self._cds = cds
1341
    self._x509_cert_pem = x509_cert_pem
1342
    self._disk_count = disk_count
1343
    self._external_address = external_address
1344

    
1345
    self._dresults = [None] * disk_count
1346
    self._daemon_port = [None] * disk_count
1347

    
1348
    self._salt = utils.GenerateSecret(8)
1349

    
1350
  @property
1351
  def disk_results(self):
1352
    """Returns per-disk results.
1353

1354
    """
1355
    return self._dresults
1356

    
1357
  def _CheckAllListening(self):
1358
    """Checks whether all daemons are listening.
1359

1360
    If all daemons are listening, the information is sent to the client.
1361

1362
    """
1363
    if not compat.all(dp is not None for dp in self._daemon_port):
1364
      return
1365

    
1366
    host = self._external_address
1367

    
1368
    disks = []
1369
    for idx, (port, magic) in enumerate(self._daemon_port):
1370
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1371
                                               idx, host, port, magic))
1372

    
1373
    assert len(disks) == self._disk_count
1374

    
1375
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1376
      "disks": disks,
1377
      "x509_ca": self._x509_cert_pem,
1378
      })
1379

    
1380
  def ReportListening(self, ie, private, _):
1381
    """Called when daemon started listening.
1382

1383
    """
1384
    (idx, ) = private
1385

    
1386
    self._feedback_fn("Disk %s is now listening" % idx)
1387

    
1388
    assert self._daemon_port[idx] is None
1389

    
1390
    self._daemon_port[idx] = (ie.listen_port, ie.magic)
1391

    
1392
    self._CheckAllListening()
1393

    
1394
  def ReportConnected(self, ie, private):
1395
    """Called when a connection has been established.
1396

1397
    """
1398
    (idx, ) = private
1399

    
1400
    self._feedback_fn("Disk %s is now receiving data" % idx)
1401

    
1402
  def ReportFinished(self, ie, private):
1403
    """Called when a transfer has finished.
1404

1405
    """
1406
    (idx, ) = private
1407

    
1408
    # Daemon is certainly no longer listening
1409
    self._daemon_port[idx] = None
1410

    
1411
    if ie.success:
1412
      self._feedback_fn("Disk %s finished receiving data" % idx)
1413
    else:
1414
      self._feedback_fn(("Disk %s failed to receive data: %s"
1415
                         " (recent output: %s)") %
1416
                        (idx, ie.final_message, ie.recent_output))
1417

    
1418
    self._dresults[idx] = bool(ie.success)
1419

    
1420

    
1421
def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1422
                 cds, timeouts):
1423
  """Imports an instance from another cluster.
1424

1425
  @param lu: Logical unit instance
1426
  @param feedback_fn: Feedback function
1427
  @type instance: L{objects.Instance}
1428
  @param instance: Instance object
1429
  @type pnode: L{objects.Node}
1430
  @param pnode: Primary node of instance as an object
1431
  @type source_x509_ca: OpenSSL.crypto.X509
1432
  @param source_x509_ca: Import source's X509 CA
1433
  @type cds: string
1434
  @param cds: Cluster domain secret
1435
  @type timeouts: L{ImportExportTimeouts}
1436
  @param timeouts: Timeouts for this import
1437

1438
  """
1439
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1440
                                                  source_x509_ca)
1441

    
1442
  magic_base = utils.GenerateSecret(6)
1443

    
1444
  # Decide whether to use IPv6
1445
  ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1446

    
1447
  # Create crypto key
1448
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1449
                                        constants.RIE_CERT_VALIDITY)
1450
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1451

    
1452
  (x509_key_name, x509_cert_pem) = result.payload
1453
  try:
1454
    # Load certificate
1455
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1456
                                                x509_cert_pem)
1457

    
1458
    # Sign certificate
1459
    signed_x509_cert_pem = \
1460
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1461

    
1462
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1463
                          len(instance.disks), pnode.primary_ip)
1464

    
1465
    ieloop = ImportExportLoop(lu)
1466
    try:
1467
      for idx, dev in enumerate(instance.disks):
1468
        magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1469

    
1470
        # Import daemon options
1471
        opts = objects.ImportExportOptions(key_name=x509_key_name,
1472
                                           ca_pem=source_ca_pem,
1473
                                           magic=magic, ipv6=ipv6)
1474

    
1475
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1476
                              "disk%d" % idx,
1477
                              constants.IEIO_SCRIPT, (dev, idx),
1478
                              timeouts, cbs, private=(idx, )))
1479

    
1480
      ieloop.Run()
1481
    finally:
1482
      ieloop.FinalizeAll()
1483
  finally:
1484
    # Remove crypto key and certificate
1485
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1486
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1487

    
1488
  return cbs.disk_results
1489

    
1490

    
1491
def _GetImportExportHandshakeMessage(version):
1492
  """Returns the handshake message for a RIE protocol version.
1493

1494
  @type version: number
1495

1496
  """
1497
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1498

    
1499

    
1500
def ComputeRemoteExportHandshake(cds):
1501
  """Computes the remote import/export handshake.
1502

1503
  @type cds: string
1504
  @param cds: Cluster domain secret
1505

1506
  """
1507
  salt = utils.GenerateSecret(8)
1508
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1509
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1510

    
1511

    
1512
def CheckRemoteExportHandshake(cds, handshake):
1513
  """Checks the handshake of a remote import/export.
1514

1515
  @type cds: string
1516
  @param cds: Cluster domain secret
1517
  @type handshake: sequence
1518
  @param handshake: Handshake sent by remote peer
1519

1520
  """
1521
  try:
1522
    (version, hmac_digest, hmac_salt) = handshake
1523
  except (TypeError, ValueError), err:
1524
    return "Invalid data: %s" % err
1525

    
1526
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1527
                              hmac_digest, salt=hmac_salt):
1528
    return "Hash didn't match, clusters don't share the same domain secret"
1529

    
1530
  if version != constants.RIE_VERSION:
1531
    return ("Clusters don't have the same remote import/export protocol"
1532
            " (local=%s, remote=%s)" %
1533
            (constants.RIE_VERSION, version))
1534

    
1535
  return None
1536

    
1537

    
1538
def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1539
  """Returns the hashed text for import/export disk information.
1540

1541
  @type disk_index: number
1542
  @param disk_index: Index of disk (included in hash)
1543
  @type host: string
1544
  @param host: Hostname
1545
  @type port: number
1546
  @param port: Daemon port
1547
  @type magic: string
1548
  @param magic: Magic value
1549

1550
  """
1551
  return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1552

    
1553

    
1554
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1555
  """Verifies received disk information for an export.
1556

1557
  @type cds: string
1558
  @param cds: Cluster domain secret
1559
  @type disk_index: number
1560
  @param disk_index: Index of disk (included in hash)
1561
  @type disk_info: sequence
1562
  @param disk_info: Disk information sent by remote peer
1563

1564
  """
1565
  try:
1566
    (host, port, magic, hmac_digest, hmac_salt) = disk_info
1567
  except (TypeError, ValueError), err:
1568
    raise errors.GenericError("Invalid data: %s" % err)
1569

    
1570
  if not (host and port and magic):
1571
    raise errors.GenericError("Missing destination host, port or magic")
1572

    
1573
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1574

    
1575
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1576
    raise errors.GenericError("HMAC is wrong")
1577

    
1578
  if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1579
    destination = host
1580
  else:
1581
    destination = netutils.Hostname.GetNormalizedName(host)
1582

    
1583
  return (destination,
1584
          utils.ValidateServiceName(port),
1585
          magic)
1586

    
1587

    
1588
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1589
  """Computes the signed disk information for a remote import.
1590

1591
  @type cds: string
1592
  @param cds: Cluster domain secret
1593
  @type salt: string
1594
  @param salt: HMAC salt
1595
  @type disk_index: number
1596
  @param disk_index: Index of disk (included in hash)
1597
  @type host: string
1598
  @param host: Hostname
1599
  @type port: number
1600
  @param port: Daemon port
1601
  @type magic: string
1602
  @param magic: Magic value
1603

1604
  """
1605
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1606
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1607
  return (host, port, magic, hmac_digest, salt)