Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ dcb93971

History | View | Annotate | Download (18.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script to show add a new node to the cluster
23

24
"""
25

    
26
# pylint: disable-msg=C0103
27

    
28
import os
29

    
30
from twisted.internet.pollreactor import PollReactor
31

    
32
class ReReactor(PollReactor):
33
  """A re-startable Reactor implementation"""
34

    
35
  def run(self, installSignalHandlers=1):
36
    """Custom run method.
37

38
    This is customized run that, before calling Reactor.run, will
39
    reinstall the shutdown events and re-create the threadpool in case
40
    these are not present (as will happen on the second run of the
41
    reactor).
42

43
    """
44
    if not 'shutdown' in self._eventTriggers:
45
      # the shutdown queue has been killed, we are most probably
46
      # at the second run, thus recreate the queue
47
      self.addSystemEventTrigger('during', 'shutdown', self.crash)
48
      self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
49
    if self.threadpool is not None and self.threadpool.joined == 1:
50
      # in case the threadpool has been stopped, re-start it
51
      # and add a trigger to stop it at reactor shutdown
52
      self.threadpool.start()
53
      self.addSystemEventTrigger('during', 'shutdown', self.threadpool.stop)
54

    
55
    return PollReactor.run(self, installSignalHandlers)
56

    
57

    
58
import twisted.internet.main
59
twisted.internet.main.installReactor(ReReactor())
60

    
61
from twisted.spread import pb
62
from twisted.internet import reactor
63
from twisted.cred import credentials
64
from OpenSSL import SSL, crypto
65

    
66
from ganeti import logger
67
from ganeti import utils
68
from ganeti import errors
69
from ganeti import constants
70
from ganeti import objects
71
from ganeti import ssconf
72

    
73
class NodeController:
74
  """Node-handling class.
75

76
  For each node that we speak with, we create an instance of this
77
  class, so that we have a safe place to store the details of this
78
  individual call.
79

80
  """
81
  def __init__(self, parent, node):
82
    self.parent = parent
83
    self.node = node
84

    
85
  def _check_end(self):
86
    """Stop the reactor if we got all the results.
87

88
    """
89
    if len(self.parent.results) == len(self.parent.nc):
90
      reactor.stop()
91

    
92
  def cb_call(self, obj):
93
    """Callback for successfull connect.
94

95
    If the connect and login sequence succeeded, we proceed with
96
    making the actual call.
97

98
    """
99
    deferred = obj.callRemote(self.parent.procedure, self.parent.args)
100
    deferred.addCallbacks(self.cb_done, self.cb_err2)
101

    
102
  def cb_done(self, result):
103
    """Callback for successful call.
104

105
    When we receive the result from a call, we check if it was an
106
    error and if so we raise a generic RemoteError (we can't pass yet
107
    the actual exception over). If there was no error, we store the
108
    result.
109

110
    """
111
    tb, self.parent.results[self.node] = result
112
    self._check_end()
113
    if tb:
114
      raise errors.RemoteError("Remote procedure error calling %s on %s:"
115
                               "\n%s" % (self.parent.procedure,
116
                                         self.node,
117
                                         tb))
118

    
119
  def cb_err1(self, reason):
120
    """Error callback for unsuccessful connect.
121

122
    """
123
    logger.Error("caller_connect: could not connect to remote host %s,"
124
                 " reason %s" % (self.node, reason))
125
    self.parent.results[self.node] = False
126
    self._check_end()
127

    
128
  def cb_err2(self, reason):
129
    """Error callback for unsuccessful call.
130

131
    This is when the call didn't return anything, not even an error,
132
    or when it time out, etc.
133

134
    """
135
    logger.Error("caller_call: could not call %s on node %s,"
136
                 " reason %s" % (self.parent.procedure, self.node, reason))
137
    self.parent.results[self.node] = False
138
    self._check_end()
139

    
140

    
141
class MirrorContextFactory:
142
  """Certificate verifier factory.
143

144
  This factory creates contexts that verify if the remote end has a
145
  specific certificate (i.e. our own certificate).
146

147
  The checks we do are that the PEM dump of the certificate is the
148
  same as our own and (somewhat redundantly) that the SHA checksum is
149
  the same.
150

151
  """
152
  isClient = 1
153

    
154
  def __init__(self):
155
    try:
156
      fd = open(constants.SSL_CERT_FILE, 'r')
157
      try:
158
        data = fd.read(16384)
159
      finally:
160
        fd.close()
161
    except EnvironmentError, err:
162
      raise errors.ConfigurationError, ("missing SSL certificate: %s" %
163
                                        str(err))
164
    self.mycert = crypto.load_certificate(crypto.FILETYPE_PEM, data)
165
    self.mypem = crypto.dump_certificate(crypto.FILETYPE_PEM, self.mycert)
166
    self.mydigest = self.mycert.digest('SHA')
167

    
168
  def verifier(self, conn, x509, errno, err_depth, retcode):
169
    """Certificate verify method.
170

171
    """
172
    if self.mydigest != x509.digest('SHA'):
173
      return False
174
    if crypto.dump_certificate(crypto.FILETYPE_PEM, x509) != self.mypem:
175
      return False
176
    return True
177

    
178
  def getContext(self):
179
    """Context generator.
180

181
    """
182
    context = SSL.Context(SSL.TLSv1_METHOD)
183
    context.set_verify(SSL.VERIFY_PEER, self.verifier)
184
    return context
185

    
186
class Client:
187
  """RPC Client class.
188

189
  This class, given a (remote) ethod name, a list of parameters and a
190
  list of nodes, will contact (in parallel) all nodes, and return a
191
  dict of results (key: node name, value: result).
192

193
  One current bug is that generic failure is still signalled by
194
  'False' result, which is not good. This overloading of values can
195
  cause bugs.
196

197
  """
198
  result_set = False
199
  result = False
200
  allresult = []
201

    
202
  def __init__(self, procedure, args):
203
    ss = ssconf.SimpleStore()
204
    self.port = ss.GetNodeDaemonPort()
205
    self.nodepw = ss.GetNodeDaemonPassword()
206
    self.nc = {}
207
    self.results = {}
208
    self.procedure = procedure
209
    self.args = args
210

    
211
  #--- generic connector -------------
212

    
213
  def connect_list(self, node_list):
214
    """Add a list of nodes to the target nodes.
215

216
    """
217
    for node in node_list:
218
      self.connect(node)
219

    
220
  def connect(self, connect_node):
221
    """Add a node to the target list.
222

223
    """
224
    factory = pb.PBClientFactory()
225
    self.nc[connect_node] = nc = NodeController(self, connect_node)
226
    reactor.connectSSL(connect_node, self.port, factory,
227
                       MirrorContextFactory())
228
    #d = factory.getRootObject()
229
    d = factory.login(credentials.UsernamePassword("master_node", self.nodepw))
230
    d.addCallbacks(nc.cb_call, nc.cb_err1)
231

    
232
  def getresult(self):
233
    """Return the results of the call.
234

235
    """
236
    return self.results
237

    
238
  def run(self):
239
    """Wrapper over reactor.run().
240

241
    This function simply calls reactor.run() if we have any requests
242
    queued, otherwise it does nothing.
243

244
    """
245
    if self.nc:
246
      reactor.run()
247

    
248

    
249
def call_volume_list(node_list, vg_name):
250
  """Gets the logical volumes present in a given volume group.
251

252
  This is a multi-node call.
253

254
  """
255
  c = Client("volume_list", [vg_name])
256
  c.connect_list(node_list)
257
  c.run()
258
  return c.getresult()
259

    
260

    
261
def call_vg_list(node_list):
262
  """Gets the volume group list.
263

264
  This is a multi-node call.
265

266
  """
267
  c = Client("vg_list", [])
268
  c.connect_list(node_list)
269
  c.run()
270
  return c.getresult()
271

    
272

    
273
def call_bridges_exist(node, bridges_list):
274
  """Checks if a node has all the bridges given.
275

276
  This method checks if all bridges given in the bridges_list are
277
  present on the remote node, so that an instance that uses interfaces
278
  on those bridges can be started.
279

280
  This is a single-node call.
281

282
  """
283
  c = Client("bridges_exist", [bridges_list])
284
  c.connect(node)
285
  c.run()
286
  return c.getresult().get(node, False)
287

    
288

    
289
def call_instance_start(node, instance, extra_args):
290
  """Stars an instance.
291

292
  This is a single-node call.
293

294
  """
295
  c = Client("instance_start", [instance.Dumps(), extra_args])
296
  c.connect(node)
297
  c.run()
298
  return c.getresult().get(node, False)
299

    
300

    
301
def call_instance_shutdown(node, instance):
302
  """Stops an instance.
303

304
  This is a single-node call.
305

306
  """
307
  c = Client("instance_shutdown", [instance.Dumps()])
308
  c.connect(node)
309
  c.run()
310
  return c.getresult().get(node, False)
311

    
312

    
313
def call_instance_os_add(node, inst, osdev, swapdev):
314
  """Installs an OS on the given instance.
315

316
  This is a single-node call.
317

318
  """
319
  params = [inst.Dumps(), osdev, swapdev]
320
  c = Client("instance_os_add", params)
321
  c.connect(node)
322
  c.run()
323
  return c.getresult().get(node, False)
324

    
325

    
326
def call_instance_info(node, instance):
327
  """Returns information about a single instance.
328

329
  This is a single-node call.
330

331
  """
332
  c = Client("instance_info", [instance])
333
  c.connect(node)
334
  c.run()
335
  return c.getresult().get(node, False)
336

    
337

    
338
def call_all_instances_info(node_list):
339
  """Returns information about all instances on a given node.
340

341
  This is a single-node call.
342

343
  """
344
  c = Client("all_instances_info", [])
345
  c.connect_list(node_list)
346
  c.run()
347
  return c.getresult()
348

    
349

    
350
def call_instance_list(node_list):
351
  """Returns the list of running instances on a given node.
352

353
  This is a single-node call.
354

355
  """
356
  c = Client("instance_list", [])
357
  c.connect_list(node_list)
358
  c.run()
359
  return c.getresult()
360

    
361

    
362
def call_node_info(node_list, vg_name):
363
  """Return node information.
364

365
  This will return memory information and volume group size and free
366
  space.
367

368
  This is a multi-node call.
369

370
  """
371
  c = Client("node_info", [vg_name])
372
  c.connect_list(node_list)
373
  c.run()
374
  retux = c.getresult()
375

    
376
  for node_name in retux:
377
    ret = retux.get(node_name, False)
378
    if type(ret) != dict:
379
      logger.Error("could not connect to node %s" % (node_name))
380
      ret = {}
381

    
382
    utils.CheckDict(ret,
383
                    { 'memory_total' : '-',
384
                      'memory_dom0' : '-',
385
                      'memory_free' : '-',
386
                      'vg_size' : 'node_unreachable',
387
                      'vg_free' : '-' },
388
                    "call_node_info",
389
                    )
390
  return retux
391

    
392

    
393
def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
394
  """Add a node to the cluster.
395

396
  This is a single-node call.
397

398
  """
399
  params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
400
  c = Client("node_add", params)
401
  c.connect(node)
402
  c.run()
403
  return c.getresult().get(node, False)
404

    
405

    
406
def call_node_verify(node_list, checkdict):
407
  """Request verification of given parameters.
408

409
  This is a multi-node call.
410

411
  """
412
  c = Client("node_verify", [checkdict])
413
  c.connect_list(node_list)
414
  c.run()
415
  return c.getresult()
416

    
417

    
418
def call_node_start_master(node):
419
  """Tells a node to activate itself as a master.
420

421
  This is a single-node call.
422

423
  """
424
  c = Client("node_start_master", [])
425
  c.connect(node)
426
  c.run()
427
  return c.getresult().get(node, False)
428

    
429

    
430
def call_node_stop_master(node):
431
  """Tells a node to demote itself from master status.
432

433
  This is a single-node call.
434

435
  """
436
  c = Client("node_stop_master", [])
437
  c.connect(node)
438
  c.run()
439
  return c.getresult().get(node, False)
440

    
441

    
442
def call_version(node_list):
443
  """Query node version.
444

445
  This is a multi-node call.
446

447
  """
448
  c = Client("version", [])
449
  c.connect_list(node_list)
450
  c.run()
451
  return c.getresult()
452

    
453

    
454
def call_configfile_list(node_list):
455
  """Return list of existing configuration files.
456

457
  This is a multi-node call.
458

459
  """
460
  c = Client("configfile_list", [])
461
  c.connect_list(node_list)
462
  c.run()
463
  return c.getresult()
464

    
465
def call_blockdev_create(node, bdev, size, on_primary):
466
  """Request creation of a given block device.
467

468
  This is a single-node call.
469

470
  """
471
  params = [bdev.Dumps(), size, on_primary]
472
  c = Client("blockdev_create", params)
473
  c.connect(node)
474
  c.run()
475
  return c.getresult().get(node, False)
476

    
477

    
478
def call_blockdev_remove(node, bdev):
479
  """Request removal of a given block device.
480

481
  This is a single-node call.
482

483
  """
484
  c = Client("blockdev_remove", [bdev.Dumps()])
485
  c.connect(node)
486
  c.run()
487
  return c.getresult().get(node, False)
488

    
489

    
490
def call_blockdev_assemble(node, disk, on_primary):
491
  """Request assembling of a given block device.
492

493
  This is a single-node call.
494

495
  """
496
  params = [disk.Dumps(), on_primary]
497
  c = Client("blockdev_assemble", params)
498
  c.connect(node)
499
  c.run()
500
  return c.getresult().get(node, False)
501

    
502

    
503
def call_blockdev_shutdown(node, disk):
504
  """Request shutdown of a given block device.
505

506
  This is a single-node call.
507

508
  """
509
  c = Client("blockdev_shutdown", [disk.Dumps()])
510
  c.connect(node)
511
  c.run()
512
  return c.getresult().get(node, False)
513

    
514

    
515
def call_blockdev_addchild(node, bdev, ndev):
516
  """Request adding a new child to a (mirroring) device.
517

518
  This is a single-node call.
519

520
  """
521
  params = [bdev.Dumps(), ndev.Dumps()]
522
  c = Client("blockdev_addchild", params)
523
  c.connect(node)
524
  c.run()
525
  return c.getresult().get(node, False)
526

    
527

    
528
def call_blockdev_removechild(node, bdev, ndev):
529
  """Request removing a new child from a (mirroring) device.
530

531
  This is a single-node call.
532

533
  """
534
  params = [bdev.Dumps(), ndev.Dumps()]
535
  c = Client("blockdev_removechild", params)
536
  c.connect(node)
537
  c.run()
538
  return c.getresult().get(node, False)
539

    
540

    
541
def call_blockdev_getmirrorstatus(node, disks):
542
  """Request status of a (mirroring) device.
543

544
  This is a single-node call.
545

546
  """
547
  params = [dsk.Dumps() for dsk in disks]
548
  c = Client("blockdev_getmirrorstatus", params)
549
  c.connect(node)
550
  c.run()
551
  return c.getresult().get(node, False)
552

    
553

    
554
def call_blockdev_find(node, disk):
555
  """Request identification of a given block device.
556

557
  This is a single-node call.
558

559
  """
560
  c = Client("blockdev_find", [disk.Dumps()])
561
  c.connect(node)
562
  c.run()
563
  return c.getresult().get(node, False)
564

    
565

    
566
def call_upload_file(node_list, file_name):
567
  """Upload a file.
568

569
  The node will refuse the operation in case the file is not on the
570
  approved file list.
571

572
  This is a multi-node call.
573

574
  """
575
  fh = file(file_name)
576
  try:
577
    data = fh.read()
578
  finally:
579
    fh.close()
580
  st = os.stat(file_name)
581
  params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
582
            st.st_atime, st.st_mtime]
583
  c = Client("upload_file", params)
584
  c.connect_list(node_list)
585
  c.run()
586
  return c.getresult()
587

    
588

    
589
def call_os_diagnose(node_list):
590
  """Request a diagnose of OS definitions.
591

592
  This is a multi-node call.
593

594
  """
595
  c = Client("os_diagnose", [])
596
  c.connect_list(node_list)
597
  c.run()
598
  result = c.getresult()
599
  new_result = {}
600
  for node_name in result:
601
    nr = []
602
    if result[node_name]:
603
      for data in result[node_name]:
604
        if data:
605
          if isinstance(data, basestring):
606
            nr.append(objects.ConfigObject.Loads(data))
607
          elif isinstance(data, tuple) and len(data) == 2:
608
            nr.append(errors.InvalidOS(data[0], data[1]))
609
          else:
610
            raise errors.ProgrammerError, ("Invalid data from"
611
                                           " xcserver.os_diagnose")
612
    new_result[node_name] = nr
613
  return new_result
614

    
615

    
616
def call_os_get(node_list, name):
617
  """Returns an OS definition.
618

619
  This is a multi-node call.
620

621
  """
622
  c = Client("os_get", [name])
623
  c.connect_list(node_list)
624
  c.run()
625
  result = c.getresult()
626
  new_result = {}
627
  for node_name in result:
628
    data = result[node_name]
629
    if isinstance(data, basestring):
630
      new_result[node_name] = objects.ConfigObject.Loads(data)
631
    elif isinstance(data, tuple) and len(data) == 2:
632
      new_result[node_name] = errors.InvalidOS(data[0], data[1])
633
    else:
634
      new_result[node_name] = data
635
  return new_result
636

    
637

    
638
def call_hooks_runner(node_list, hpath, phase, env):
639
  """Call the hooks runner.
640

641
  Args:
642
    - op: the OpCode instance
643
    - env: a dictionary with the environment
644

645
  This is a multi-node call.
646

647
  """
648
  params = [hpath, phase, env]
649
  c = Client("hooks_runner", params)
650
  c.connect_list(node_list)
651
  c.run()
652
  result = c.getresult()
653
  return result
654

    
655

    
656
def call_blockdev_snapshot(node, cf_bdev):
657
  """Request a snapshot of the given block device.
658

659
  This is a single-node call.
660

661
  """
662
  c = Client("blockdev_snapshot", [cf_bdev.Dumps()])
663
  c.connect(node)
664
  c.run()
665
  return c.getresult().get(node, False)
666

    
667

    
668
def call_snapshot_export(node, snap_bdev, dest_node, instance):
669
  """Request the export of a given snapshot.
670

671
  This is a single-node call.
672

673
  """
674
  params = [snap_bdev.Dumps(), dest_node, instance.Dumps()]
675
  c = Client("snapshot_export", params)
676
  c.connect(node)
677
  c.run()
678
  return c.getresult().get(node, False)
679

    
680

    
681
def call_finalize_export(node, instance, snap_disks):
682
  """Request the completion of an export operation.
683

684
  This writes the export config file, etc.
685

686
  This is a single-node call.
687

688
  """
689
  flat_disks = []
690
  for disk in snap_disks:
691
    flat_disks.append(disk.Dumps())
692
  params = [instance.Dumps(), flat_disks]
693
  c = Client("finalize_export", params)
694
  c.connect(node)
695
  c.run()
696
  return c.getresult().get(node, False)
697

    
698

    
699
def call_export_info(node, path):
700
  """Queries the export information in a given path.
701

702
  This is a single-node call.
703

704
  """
705
  c = Client("export_info", [path])
706
  c.connect(node)
707
  c.run()
708
  result = c.getresult().get(node, False)
709
  if not result:
710
    return result
711
  return objects.SerializableConfigParser.Loads(result)
712

    
713

    
714
def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
715
  """Request the import of a backup into an instance.
716

717
  This is a single-node call.
718

719
  """
720
  params = [inst.Dumps(), osdev, swapdev, src_node, src_image]
721
  c = Client("instance_os_import", params)
722
  c.connect(node)
723
  c.run()
724
  return c.getresult().get(node, False)
725

    
726

    
727
def call_export_list(node_list):
728
  """Gets the stored exports list.
729

730
  This is a multi-node call.
731

732
  """
733
  c = Client("export_list", [])
734
  c.connect_list(node_list)
735
  c.run()
736
  result = c.getresult()
737
  return result
738

    
739

    
740
def call_export_remove(node, export):
741
  """Requests removal of a given export.
742

743
  This is a single-node call.
744

745
  """
746
  c = Client("export_remove", [export])
747
  c.connect(node)
748
  c.run()
749
  return c.getresult().get(node, False)
750

    
751

    
752
def call_node_leave_cluster(node):
753
  """Requests a node to clean the cluster information it has.
754

755
  This will remove the configuration information from the ganeti data
756
  dir.
757

758
  This is a single-node call.
759

760
  """
761
  c = Client("node_leave_cluster", [])
762
  c.connect(node)
763
  c.run()
764
  return c.getresult().get(node, False)
765

    
766

    
767
def call_node_volumes(node_list):
768
  """Gets all volumes on node(s).
769

770
  This is a multi-node call.
771

772
  """
773
  c = Client("node_volumes", [])
774
  c.connect_list(node_list)
775
  c.run()
776
  return c.getresult()