Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ f3e513ad

History | View | Annotate | Download (19.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script to show add a new node to the cluster
23

24
"""
25

    
26
# pylint: disable-msg=C0103
27

    
28
import os
29

    
30
from twisted.internet.pollreactor import PollReactor
31

    
32
class ReReactor(PollReactor):
33
  """A re-startable Reactor implementation.
34

35
  """
36
  def run(self, installSignalHandlers=1):
37
    """Custom run method.
38

39
    This is customized run that, before calling Reactor.run, will
40
    reinstall the shutdown events and re-create the threadpool in case
41
    these are not present (as will happen on the second run of the
42
    reactor).
43

44
    """
45
    if not 'shutdown' in self._eventTriggers:
46
      # the shutdown queue has been killed, we are most probably
47
      # at the second run, thus recreate the queue
48
      self.addSystemEventTrigger('during', 'shutdown', self.crash)
49
      self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
50
    if self.threadpool is not None and self.threadpool.joined == 1:
51
      # in case the threadpool has been stopped, re-start it
52
      # and add a trigger to stop it at reactor shutdown
53
      self.threadpool.start()
54
      self.addSystemEventTrigger('during', 'shutdown', self.threadpool.stop)
55

    
56
    return PollReactor.run(self, installSignalHandlers)
57

    
58

    
59
import twisted.internet.main
60
twisted.internet.main.installReactor(ReReactor())
61

    
62
from twisted.spread import pb
63
from twisted.internet import reactor
64
from twisted.cred import credentials
65
from OpenSSL import SSL, crypto
66

    
67
from ganeti import logger
68
from ganeti import utils
69
from ganeti import errors
70
from ganeti import constants
71
from ganeti import objects
72
from ganeti import ssconf
73

    
74
class NodeController:
75
  """Node-handling class.
76

77
  For each node that we speak with, we create an instance of this
78
  class, so that we have a safe place to store the details of this
79
  individual call.
80

81
  """
82
  def __init__(self, parent, node):
83
    self.parent = parent
84
    self.node = node
85

    
86
  def _check_end(self):
87
    """Stop the reactor if we got all the results.
88

89
    """
90
    if len(self.parent.results) == len(self.parent.nc):
91
      reactor.stop()
92

    
93
  def cb_call(self, obj):
94
    """Callback for successful connect.
95

96
    If the connect and login sequence succeeded, we proceed with
97
    making the actual call.
98

99
    """
100
    deferred = obj.callRemote(self.parent.procedure, self.parent.args)
101
    deferred.addCallbacks(self.cb_done, self.cb_err2)
102

    
103
  def cb_done(self, result):
104
    """Callback for successful call.
105

106
    When we receive the result from a call, we check if it was an
107
    error and if so we raise a generic RemoteError (we can't pass yet
108
    the actual exception over). If there was no error, we store the
109
    result.
110

111
    """
112
    tb, self.parent.results[self.node] = result
113
    self._check_end()
114
    if tb:
115
      raise errors.RemoteError("Remote procedure error calling %s on %s:"
116
                               "\n%s" % (self.parent.procedure,
117
                                         self.node,
118
                                         tb))
119

    
120
  def cb_err1(self, reason):
121
    """Error callback for unsuccessful connect.
122

123
    """
124
    logger.Error("caller_connect: could not connect to remote host %s,"
125
                 " reason %s" % (self.node, reason))
126
    self.parent.results[self.node] = False
127
    self._check_end()
128

    
129
  def cb_err2(self, reason):
130
    """Error callback for unsuccessful call.
131

132
    This is when the call didn't return anything, not even an error,
133
    or when it time out, etc.
134

135
    """
136
    logger.Error("caller_call: could not call %s on node %s,"
137
                 " reason %s" % (self.parent.procedure, self.node, reason))
138
    self.parent.results[self.node] = False
139
    self._check_end()
140

    
141

    
142
class MirrorContextFactory:
143
  """Certificate verifier factory.
144

145
  This factory creates contexts that verify if the remote end has a
146
  specific certificate (i.e. our own certificate).
147

148
  The checks we do are that the PEM dump of the certificate is the
149
  same as our own and (somewhat redundantly) that the SHA checksum is
150
  the same.
151

152
  """
153
  isClient = 1
154

    
155
  def __init__(self):
156
    try:
157
      fd = open(constants.SSL_CERT_FILE, 'r')
158
      try:
159
        data = fd.read(16384)
160
      finally:
161
        fd.close()
162
    except EnvironmentError, err:
163
      raise errors.ConfigurationError("missing SSL certificate: %s" %
164
                                      str(err))
165
    self.mycert = crypto.load_certificate(crypto.FILETYPE_PEM, data)
166
    self.mypem = crypto.dump_certificate(crypto.FILETYPE_PEM, self.mycert)
167
    self.mydigest = self.mycert.digest('SHA')
168

    
169
  def verifier(self, conn, x509, errno, err_depth, retcode):
170
    """Certificate verify method.
171

172
    """
173
    if self.mydigest != x509.digest('SHA'):
174
      return False
175
    if crypto.dump_certificate(crypto.FILETYPE_PEM, x509) != self.mypem:
176
      return False
177
    return True
178

    
179
  def getContext(self):
180
    """Context generator.
181

182
    """
183
    context = SSL.Context(SSL.TLSv1_METHOD)
184
    context.set_verify(SSL.VERIFY_PEER, self.verifier)
185
    return context
186

    
187
class Client:
188
  """RPC Client class.
189

190
  This class, given a (remote) method name, a list of parameters and a
191
  list of nodes, will contact (in parallel) all nodes, and return a
192
  dict of results (key: node name, value: result).
193

194
  One current bug is that generic failure is still signalled by
195
  'False' result, which is not good. This overloading of values can
196
  cause bugs.
197

198
  """
199
  result_set = False
200
  result = False
201
  allresult = []
202

    
203
  def __init__(self, procedure, args):
204
    ss = ssconf.SimpleStore()
205
    self.port = ss.GetNodeDaemonPort()
206
    self.nodepw = ss.GetNodeDaemonPassword()
207
    self.nc = {}
208
    self.results = {}
209
    self.procedure = procedure
210
    self.args = args
211

    
212
  #--- generic connector -------------
213

    
214
  def connect_list(self, node_list):
215
    """Add a list of nodes to the target nodes.
216

217
    """
218
    for node in node_list:
219
      self.connect(node)
220

    
221
  def connect(self, connect_node):
222
    """Add a node to the target list.
223

224
    """
225
    factory = pb.PBClientFactory()
226
    self.nc[connect_node] = nc = NodeController(self, connect_node)
227
    reactor.connectSSL(connect_node, self.port, factory,
228
                       MirrorContextFactory())
229
    #d = factory.getRootObject()
230
    d = factory.login(credentials.UsernamePassword("master_node", self.nodepw))
231
    d.addCallbacks(nc.cb_call, nc.cb_err1)
232

    
233
  def getresult(self):
234
    """Return the results of the call.
235

236
    """
237
    return self.results
238

    
239
  def run(self):
240
    """Wrapper over reactor.run().
241

242
    This function simply calls reactor.run() if we have any requests
243
    queued, otherwise it does nothing.
244

245
    """
246
    if self.nc:
247
      reactor.run()
248

    
249

    
250
def call_volume_list(node_list, vg_name):
251
  """Gets the logical volumes present in a given volume group.
252

253
  This is a multi-node call.
254

255
  """
256
  c = Client("volume_list", [vg_name])
257
  c.connect_list(node_list)
258
  c.run()
259
  return c.getresult()
260

    
261

    
262
def call_vg_list(node_list):
263
  """Gets the volume group list.
264

265
  This is a multi-node call.
266

267
  """
268
  c = Client("vg_list", [])
269
  c.connect_list(node_list)
270
  c.run()
271
  return c.getresult()
272

    
273

    
274
def call_bridges_exist(node, bridges_list):
275
  """Checks if a node has all the bridges given.
276

277
  This method checks if all bridges given in the bridges_list are
278
  present on the remote node, so that an instance that uses interfaces
279
  on those bridges can be started.
280

281
  This is a single-node call.
282

283
  """
284
  c = Client("bridges_exist", [bridges_list])
285
  c.connect(node)
286
  c.run()
287
  return c.getresult().get(node, False)
288

    
289

    
290
def call_instance_start(node, instance, extra_args):
291
  """Starts an instance.
292

293
  This is a single-node call.
294

295
  """
296
  c = Client("instance_start", [instance.ToDict(), extra_args])
297
  c.connect(node)
298
  c.run()
299
  return c.getresult().get(node, False)
300

    
301

    
302
def call_instance_shutdown(node, instance):
303
  """Stops an instance.
304

305
  This is a single-node call.
306

307
  """
308
  c = Client("instance_shutdown", [instance.ToDict()])
309
  c.connect(node)
310
  c.run()
311
  return c.getresult().get(node, False)
312

    
313

    
314
def call_instance_reboot(node, instance, reboot_type, extra_args):
315
  """Reboots an instance.
316

317
  This is a single-node call.
318

319
  """
320
  c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
321
  c.connect(node)
322
  c.run()
323
  return c.getresult().get(node, False)
324

    
325

    
326
def call_instance_os_add(node, inst, osdev, swapdev):
327
  """Installs an OS on the given instance.
328

329
  This is a single-node call.
330

331
  """
332
  params = [inst.ToDict(), osdev, swapdev]
333
  c = Client("instance_os_add", params)
334
  c.connect(node)
335
  c.run()
336
  return c.getresult().get(node, False)
337

    
338

    
339
def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
340
  """Run the OS rename script for an instance.
341

342
  This is a single-node call.
343

344
  """
345
  params = [inst.ToDict(), old_name, osdev, swapdev]
346
  c = Client("instance_run_rename", params)
347
  c.connect(node)
348
  c.run()
349
  return c.getresult().get(node, False)
350

    
351

    
352
def call_instance_info(node, instance):
353
  """Returns information about a single instance.
354

355
  This is a single-node call.
356

357
  """
358
  c = Client("instance_info", [instance])
359
  c.connect(node)
360
  c.run()
361
  return c.getresult().get(node, False)
362

    
363

    
364
def call_all_instances_info(node_list):
365
  """Returns information about all instances on a given node.
366

367
  This is a single-node call.
368

369
  """
370
  c = Client("all_instances_info", [])
371
  c.connect_list(node_list)
372
  c.run()
373
  return c.getresult()
374

    
375

    
376
def call_instance_list(node_list):
377
  """Returns the list of running instances on a given node.
378

379
  This is a single-node call.
380

381
  """
382
  c = Client("instance_list", [])
383
  c.connect_list(node_list)
384
  c.run()
385
  return c.getresult()
386

    
387

    
388
def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
389
  """Do a TcpPing on the remote node
390

391
  This is a single-node call.
392
  """
393
  c = Client("node_tcp_ping", [source, target, port, timeout,
394
                               live_port_needed])
395
  c.connect(node)
396
  c.run()
397
  return c.getresult().get(node, False)
398

    
399

    
400
def call_node_info(node_list, vg_name):
401
  """Return node information.
402

403
  This will return memory information and volume group size and free
404
  space.
405

406
  This is a multi-node call.
407

408
  """
409
  c = Client("node_info", [vg_name])
410
  c.connect_list(node_list)
411
  c.run()
412
  retux = c.getresult()
413

    
414
  for node_name in retux:
415
    ret = retux.get(node_name, False)
416
    if type(ret) != dict:
417
      logger.Error("could not connect to node %s" % (node_name))
418
      ret = {}
419

    
420
    utils.CheckDict(ret,
421
                    { 'memory_total' : '-',
422
                      'memory_dom0' : '-',
423
                      'memory_free' : '-',
424
                      'vg_size' : 'node_unreachable',
425
                      'vg_free' : '-' },
426
                    "call_node_info",
427
                    )
428
  return retux
429

    
430

    
431
def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
432
  """Add a node to the cluster.
433

434
  This is a single-node call.
435

436
  """
437
  params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
438
  c = Client("node_add", params)
439
  c.connect(node)
440
  c.run()
441
  return c.getresult().get(node, False)
442

    
443

    
444
def call_node_verify(node_list, checkdict):
445
  """Request verification of given parameters.
446

447
  This is a multi-node call.
448

449
  """
450
  c = Client("node_verify", [checkdict])
451
  c.connect_list(node_list)
452
  c.run()
453
  return c.getresult()
454

    
455

    
456
def call_node_start_master(node):
457
  """Tells a node to activate itself as a master.
458

459
  This is a single-node call.
460

461
  """
462
  c = Client("node_start_master", [])
463
  c.connect(node)
464
  c.run()
465
  return c.getresult().get(node, False)
466

    
467

    
468
def call_node_stop_master(node):
469
  """Tells a node to demote itself from master status.
470

471
  This is a single-node call.
472

473
  """
474
  c = Client("node_stop_master", [])
475
  c.connect(node)
476
  c.run()
477
  return c.getresult().get(node, False)
478

    
479

    
480
def call_version(node_list):
481
  """Query node version.
482

483
  This is a multi-node call.
484

485
  """
486
  c = Client("version", [])
487
  c.connect_list(node_list)
488
  c.run()
489
  return c.getresult()
490

    
491

    
492
def call_blockdev_create(node, bdev, size, on_primary, info):
493
  """Request creation of a given block device.
494

495
  This is a single-node call.
496

497
  """
498
  params = [bdev.ToDict(), size, on_primary, info]
499
  c = Client("blockdev_create", params)
500
  c.connect(node)
501
  c.run()
502
  return c.getresult().get(node, False)
503

    
504

    
505
def call_blockdev_remove(node, bdev):
506
  """Request removal of a given block device.
507

508
  This is a single-node call.
509

510
  """
511
  c = Client("blockdev_remove", [bdev.ToDict()])
512
  c.connect(node)
513
  c.run()
514
  return c.getresult().get(node, False)
515

    
516

    
517
def call_blockdev_rename(node, devlist):
518
  """Request rename of the given block devices.
519

520
  This is a single-node call.
521

522
  """
523
  params = [(d.ToDict(), uid) for d, uid in devlist]
524
  c = Client("blockdev_rename", params)
525
  c.connect(node)
526
  c.run()
527
  return c.getresult().get(node, False)
528

    
529

    
530
def call_blockdev_assemble(node, disk, on_primary):
531
  """Request assembling of a given block device.
532

533
  This is a single-node call.
534

535
  """
536
  params = [disk.ToDict(), on_primary]
537
  c = Client("blockdev_assemble", params)
538
  c.connect(node)
539
  c.run()
540
  return c.getresult().get(node, False)
541

    
542

    
543
def call_blockdev_shutdown(node, disk):
544
  """Request shutdown of a given block device.
545

546
  This is a single-node call.
547

548
  """
549
  c = Client("blockdev_shutdown", [disk.ToDict()])
550
  c.connect(node)
551
  c.run()
552
  return c.getresult().get(node, False)
553

    
554

    
555
def call_blockdev_addchildren(node, bdev, ndevs):
556
  """Request adding a list of children to a (mirroring) device.
557

558
  This is a single-node call.
559

560
  """
561
  params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
562
  c = Client("blockdev_addchildren", params)
563
  c.connect(node)
564
  c.run()
565
  return c.getresult().get(node, False)
566

    
567

    
568
def call_blockdev_removechildren(node, bdev, ndevs):
569
  """Request removing a list of children from a (mirroring) device.
570

571
  This is a single-node call.
572

573
  """
574
  params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
575
  c = Client("blockdev_removechildren", params)
576
  c.connect(node)
577
  c.run()
578
  return c.getresult().get(node, False)
579

    
580

    
581
def call_blockdev_getmirrorstatus(node, disks):
582
  """Request status of a (mirroring) device.
583

584
  This is a single-node call.
585

586
  """
587
  params = [dsk.ToDict() for dsk in disks]
588
  c = Client("blockdev_getmirrorstatus", params)
589
  c.connect(node)
590
  c.run()
591
  return c.getresult().get(node, False)
592

    
593

    
594
def call_blockdev_find(node, disk):
595
  """Request identification of a given block device.
596

597
  This is a single-node call.
598

599
  """
600
  c = Client("blockdev_find", [disk.ToDict()])
601
  c.connect(node)
602
  c.run()
603
  return c.getresult().get(node, False)
604

    
605

    
606
def call_upload_file(node_list, file_name):
607
  """Upload a file.
608

609
  The node will refuse the operation in case the file is not on the
610
  approved file list.
611

612
  This is a multi-node call.
613

614
  """
615
  fh = file(file_name)
616
  try:
617
    data = fh.read()
618
  finally:
619
    fh.close()
620
  st = os.stat(file_name)
621
  params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
622
            st.st_atime, st.st_mtime]
623
  c = Client("upload_file", params)
624
  c.connect_list(node_list)
625
  c.run()
626
  return c.getresult()
627

    
628

    
629
def call_os_diagnose(node_list):
630
  """Request a diagnose of OS definitions.
631

632
  This is a multi-node call.
633

634
  """
635
  c = Client("os_diagnose", [])
636
  c.connect_list(node_list)
637
  c.run()
638
  result = c.getresult()
639
  new_result = {}
640
  for node_name in result:
641
    nr = []
642
    if result[node_name]:
643
      for data in result[node_name]:
644
        if data:
645
          if isinstance(data, dict):
646
            nr.append(objects.OS.FromDict(data))
647
          elif isinstance(data, tuple) and len(data) == 3:
648
            nr.append(errors.InvalidOS(data[0], data[1], data[2]))
649
          else:
650
            raise errors.ProgrammerError("Invalid data from"
651
                                         " xcserver.os_diagnose")
652
    new_result[node_name] = nr
653
  return new_result
654

    
655

    
656
def call_os_get(node_list, name):
657
  """Returns an OS definition.
658

659
  This is a multi-node call.
660

661
  """
662
  c = Client("os_get", [name])
663
  c.connect_list(node_list)
664
  c.run()
665
  result = c.getresult()
666
  new_result = {}
667
  for node_name in result:
668
    data = result[node_name]
669
    if isinstance(data, dict):
670
      new_result[node_name] = objects.OS.FromDict(data)
671
    elif isinstance(data, tuple) and len(data) == 3:
672
      new_result[node_name] = errors.InvalidOS(data[0], data[1], data[2])
673
    else:
674
      new_result[node_name] = data
675
  return new_result
676

    
677

    
678
def call_hooks_runner(node_list, hpath, phase, env):
679
  """Call the hooks runner.
680

681
  Args:
682
    - op: the OpCode instance
683
    - env: a dictionary with the environment
684

685
  This is a multi-node call.
686

687
  """
688
  params = [hpath, phase, env]
689
  c = Client("hooks_runner", params)
690
  c.connect_list(node_list)
691
  c.run()
692
  result = c.getresult()
693
  return result
694

    
695

    
696
def call_blockdev_snapshot(node, cf_bdev):
697
  """Request a snapshot of the given block device.
698

699
  This is a single-node call.
700

701
  """
702
  c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
703
  c.connect(node)
704
  c.run()
705
  return c.getresult().get(node, False)
706

    
707

    
708
def call_snapshot_export(node, snap_bdev, dest_node, instance):
709
  """Request the export of a given snapshot.
710

711
  This is a single-node call.
712

713
  """
714
  params = [snap_bdev.ToDict(), dest_node, instance.ToDict()]
715
  c = Client("snapshot_export", params)
716
  c.connect(node)
717
  c.run()
718
  return c.getresult().get(node, False)
719

    
720

    
721
def call_finalize_export(node, instance, snap_disks):
722
  """Request the completion of an export operation.
723

724
  This writes the export config file, etc.
725

726
  This is a single-node call.
727

728
  """
729
  flat_disks = []
730
  for disk in snap_disks:
731
    flat_disks.append(disk.ToDict())
732
  params = [instance.ToDict(), flat_disks]
733
  c = Client("finalize_export", params)
734
  c.connect(node)
735
  c.run()
736
  return c.getresult().get(node, False)
737

    
738

    
739
def call_export_info(node, path):
740
  """Queries the export information in a given path.
741

742
  This is a single-node call.
743

744
  """
745
  c = Client("export_info", [path])
746
  c.connect(node)
747
  c.run()
748
  result = c.getresult().get(node, False)
749
  if not result:
750
    return result
751
  return objects.SerializableConfigParser.Loads(result)
752

    
753

    
754
def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
755
  """Request the import of a backup into an instance.
756

757
  This is a single-node call.
758

759
  """
760
  params = [inst.ToDict(), osdev, swapdev, src_node, src_image]
761
  c = Client("instance_os_import", params)
762
  c.connect(node)
763
  c.run()
764
  return c.getresult().get(node, False)
765

    
766

    
767
def call_export_list(node_list):
768
  """Gets the stored exports list.
769

770
  This is a multi-node call.
771

772
  """
773
  c = Client("export_list", [])
774
  c.connect_list(node_list)
775
  c.run()
776
  result = c.getresult()
777
  return result
778

    
779

    
780
def call_export_remove(node, export):
781
  """Requests removal of a given export.
782

783
  This is a single-node call.
784

785
  """
786
  c = Client("export_remove", [export])
787
  c.connect(node)
788
  c.run()
789
  return c.getresult().get(node, False)
790

    
791

    
792
def call_node_leave_cluster(node):
793
  """Requests a node to clean the cluster information it has.
794

795
  This will remove the configuration information from the ganeti data
796
  dir.
797

798
  This is a single-node call.
799

800
  """
801
  c = Client("node_leave_cluster", [])
802
  c.connect(node)
803
  c.run()
804
  return c.getresult().get(node, False)
805

    
806

    
807
def call_node_volumes(node_list):
808
  """Gets all volumes on node(s).
809

810
  This is a multi-node call.
811

812
  """
813
  c = Client("node_volumes", [])
814
  c.connect_list(node_list)
815
  c.run()
816
  return c.getresult()