Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ cb91d46e

History | View | Annotate | Download (18.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script to show add a new node to the cluster
23

24
"""
25

    
26
# pylint: disable-msg=C0103
27

    
28
import os
29

    
30
from twisted.internet.pollreactor import PollReactor
31

    
32
class ReReactor(PollReactor):
33
  """A re-startable Reactor implementation"""
34

    
35
  def run(self, installSignalHandlers=1):
36
    """Custom run method.
37

38
    This is customized run that, before calling Reactor.run, will
39
    reinstall the shutdown events and re-create the threadpool in case
40
    these are not present (as will happen on the second run of the
41
    reactor).
42

43
    """
44
    if not 'shutdown' in self._eventTriggers:
45
      # the shutdown queue has been killed, we are most probably
46
      # at the second run, thus recreate the queue
47
      self.addSystemEventTrigger('during', 'shutdown', self.crash)
48
      self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
49
    if self.threadpool is not None and self.threadpool.joined == 1:
50
      # in case the threadpool has been stopped, re-start it
51
      # and add a trigger to stop it at reactor shutdown
52
      self.threadpool.start()
53
      self.addSystemEventTrigger('during', 'shutdown', self.threadpool.stop)
54

    
55
    return PollReactor.run(self, installSignalHandlers)
56

    
57

    
58
import twisted.internet.main
59
twisted.internet.main.installReactor(ReReactor())
60

    
61
from twisted.spread import pb
62
from twisted.internet import reactor
63
from twisted.cred import credentials
64
from OpenSSL import SSL, crypto
65

    
66
from ganeti import logger
67
from ganeti import utils
68
from ganeti import errors
69
from ganeti import constants
70
from ganeti import objects
71
from ganeti import ssconf
72

    
73
class NodeController:
74
  """Node-handling class.
75

76
  For each node that we speak with, we create an instance of this
77
  class, so that we have a safe place to store the details of this
78
  individual call.
79

80
  """
81
  def __init__(self, parent, node):
82
    self.parent = parent
83
    self.node = node
84

    
85
  def _check_end(self):
86
    """Stop the reactor if we got all the results.
87

88
    """
89
    if len(self.parent.results) == len(self.parent.nc):
90
      reactor.stop()
91

    
92
  def cb_call(self, obj):
93
    """Callback for successfull connect.
94

95
    If the connect and login sequence succeeded, we proceed with
96
    making the actual call.
97

98
    """
99
    deferred = obj.callRemote(self.parent.procedure, self.parent.args)
100
    deferred.addCallbacks(self.cb_done, self.cb_err2)
101

    
102
  def cb_done(self, result):
103
    """Callback for successful call.
104

105
    When we receive the result from a call, we check if it was an
106
    error and if so we raise a generic RemoteError (we can't pass yet
107
    the actual exception over). If there was no error, we store the
108
    result.
109

110
    """
111
    tb, self.parent.results[self.node] = result
112
    self._check_end()
113
    if tb:
114
      raise errors.RemoteError("Remote procedure error calling %s on %s:"
115
                               "\n%s" % (self.parent.procedure,
116
                                         self.node,
117
                                         tb))
118

    
119
  def cb_err1(self, reason):
120
    """Error callback for unsuccessful connect.
121

122
    """
123
    logger.Error("caller_connect: could not connect to remote host %s,"
124
                 " reason %s" % (self.node, reason))
125
    self.parent.results[self.node] = False
126
    self._check_end()
127

    
128
  def cb_err2(self, reason):
129
    """Error callback for unsuccessful call.
130

131
    This is when the call didn't return anything, not even an error,
132
    or when it time out, etc.
133

134
    """
135
    logger.Error("caller_call: could not call %s on node %s,"
136
                 " reason %s" % (self.parent.procedure, self.node, reason))
137
    self.parent.results[self.node] = False
138
    self._check_end()
139

    
140

    
141
class MirrorContextFactory:
142
  """Certificate verifier factory.
143

144
  This factory creates contexts that verify if the remote end has a
145
  specific certificate (i.e. our own certificate).
146

147
  The checks we do are that the PEM dump of the certificate is the
148
  same as our own and (somewhat redundantly) that the SHA checksum is
149
  the same.
150

151
  """
152
  isClient = 1
153

    
154
  def __init__(self):
155
    try:
156
      fd = open(constants.SSL_CERT_FILE, 'r')
157
      try:
158
        data = fd.read(16384)
159
      finally:
160
        fd.close()
161
    except EnvironmentError, err:
162
      raise errors.ConfigurationError, ("missing SSL certificate: %s" %
163
                                        str(err))
164
    self.mycert = crypto.load_certificate(crypto.FILETYPE_PEM, data)
165
    self.mypem = crypto.dump_certificate(crypto.FILETYPE_PEM, self.mycert)
166
    self.mydigest = self.mycert.digest('SHA')
167

    
168
  def verifier(self, conn, x509, errno, err_depth, retcode):
169
    """Certificate verify method.
170

171
    """
172
    if self.mydigest != x509.digest('SHA'):
173
      return False
174
    if crypto.dump_certificate(crypto.FILETYPE_PEM, x509) != self.mypem:
175
      return False
176
    return True
177

    
178
  def getContext(self):
179
    """Context generator.
180

181
    """
182
    context = SSL.Context(SSL.TLSv1_METHOD)
183
    context.set_verify(SSL.VERIFY_PEER, self.verifier)
184
    return context
185

    
186
class Client:
187
  """RPC Client class.
188

189
  This class, given a (remote) ethod name, a list of parameters and a
190
  list of nodes, will contact (in parallel) all nodes, and return a
191
  dict of results (key: node name, value: result).
192

193
  One current bug is that generic failure is still signalled by
194
  'False' result, which is not good. This overloading of values can
195
  cause bugs.
196

197
  """
198
  result_set = False
199
  result = False
200
  allresult = []
201

    
202
  def __init__(self, procedure, args):
203
    ss = ssconf.SimpleStore()
204
    self.port = ss.GetNodeDaemonPort()
205
    self.nodepw = ss.GetNodeDaemonPassword()
206
    self.nc = {}
207
    self.results = {}
208
    self.procedure = procedure
209
    self.args = args
210

    
211
  #--- generic connector -------------
212

    
213
  def connect_list(self, node_list):
214
    """Add a list of nodes to the target nodes.
215

216
    """
217
    for node in node_list:
218
      self.connect(node)
219

    
220
  def connect(self, connect_node):
221
    """Add a node to the target list.
222

223
    """
224
    factory = pb.PBClientFactory()
225
    self.nc[connect_node] = nc = NodeController(self, connect_node)
226
    reactor.connectSSL(connect_node, self.port, factory,
227
                       MirrorContextFactory())
228
    #d = factory.getRootObject()
229
    d = factory.login(credentials.UsernamePassword("master_node", self.nodepw))
230
    d.addCallbacks(nc.cb_call, nc.cb_err1)
231

    
232
  def getresult(self):
233
    """Return the results of the call.
234

235
    """
236
    return self.results
237

    
238
  def run(self):
239
    """Wrapper over reactor.run().
240

241
    This function simply calls reactor.run() if we have any requests
242
    queued, otherwise it does nothing.
243

244
    """
245
    if self.nc:
246
      reactor.run()
247

    
248

    
249
def call_volume_list(node_list, vg_name):
250
  """Gets the logical volumes present in a given volume group.
251

252
  This is a multi-node call.
253

254
  """
255
  c = Client("volume_list", [vg_name])
256
  c.connect_list(node_list)
257
  c.run()
258
  return c.getresult()
259

    
260

    
261
def call_vg_list(node_list):
262
  """Gets the volume group list.
263

264
  This is a multi-node call.
265

266
  """
267
  c = Client("vg_list", [])
268
  c.connect_list(node_list)
269
  c.run()
270
  return c.getresult()
271

    
272

    
273
def call_bridges_exist(node, bridges_list):
274
  """Checks if a node has all the bridges given.
275

276
  This method checks if all bridges given in the bridges_list are
277
  present on the remote node, so that an instance that uses interfaces
278
  on those bridges can be started.
279

280
  This is a single-node call.
281

282
  """
283
  c = Client("bridges_exist", [bridges_list])
284
  c.connect(node)
285
  c.run()
286
  return c.getresult().get(node, False)
287

    
288

    
289
def call_instance_start(node, instance, extra_args):
290
  """Stars an instance.
291

292
  This is a single-node call.
293

294
  """
295
  c = Client("instance_start", [instance.Dumps(), extra_args])
296
  c.connect(node)
297
  c.run()
298
  return c.getresult().get(node, False)
299

    
300

    
301
def call_instance_shutdown(node, instance):
302
  """Stops an instance.
303

304
  This is a single-node call.
305

306
  """
307
  c = Client("instance_shutdown", [instance.Dumps()])
308
  c.connect(node)
309
  c.run()
310
  return c.getresult().get(node, False)
311

    
312

    
313
def call_instance_os_add(node, inst, osdev, swapdev):
314
  """Installs an OS on the given instance.
315

316
  This is a single-node call.
317

318
  """
319
  params = [inst.Dumps(), osdev, swapdev]
320
  c = Client("instance_os_add", params)
321
  c.connect(node)
322
  c.run()
323
  return c.getresult().get(node, False)
324

    
325

    
326
def call_instance_info(node, instance):
327
  """Returns information about a single instance.
328

329
  This is a single-node call.
330

331
  """
332
  c = Client("instance_info", [instance])
333
  c.connect(node)
334
  c.run()
335
  return c.getresult().get(node, False)
336

    
337

    
338
def call_all_instances_info(node_list):
339
  """Returns information about all instances on a given node.
340

341
  This is a single-node call.
342

343
  """
344
  c = Client("all_instances_info", [])
345
  c.connect_list(node_list)
346
  c.run()
347
  return c.getresult()
348

    
349

    
350
def call_instance_list(node_list):
351
  """Returns the list of running instances on a given node.
352

353
  This is a single-node call.
354

355
  """
356
  c = Client("instance_list", [])
357
  c.connect_list(node_list)
358
  c.run()
359
  return c.getresult()
360

    
361

    
362
def call_node_info(node_list, vg_name):
363
  """Return node information.
364

365
  This will return memory information and volume group size and free
366
  space.
367

368
  This is a multi-node call.
369

370
  """
371
  c = Client("node_info", [vg_name])
372
  c.connect_list(node_list)
373
  c.run()
374
  retux = c.getresult()
375

    
376
  for node_name in retux:
377
    ret = retux.get(node_name, False)
378
    if type(ret) != dict:
379
      logger.Error("could not connect to node %s" % (node_name))
380
      ret = {}
381

    
382
    utils.CheckDict(ret,
383
                    { 'memory_total' : '-',
384
                      'memory_dom0' : '-',
385
                      'memory_free' : '-',
386
                      'vg_size' : 'node_unreachable',
387
                      'vg_free' : '-' },
388
                    "call_node_info",
389
                    )
390
  return retux
391

    
392

    
393
def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
394
  """Add a node to the cluster.
395

396
  This is a single-node call.
397

398
  """
399
  params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
400
  c = Client("node_add", params)
401
  c.connect(node)
402
  c.run()
403
  return c.getresult().get(node, False)
404

    
405

    
406
def call_node_verify(node_list, checkdict):
407
  """Request verification of given parameters.
408

409
  This is a multi-node call.
410

411
  """
412
  c = Client("node_verify", [checkdict])
413
  c.connect_list(node_list)
414
  c.run()
415
  return c.getresult()
416

    
417

    
418
def call_node_start_master(node):
419
  """Tells a node to activate itself as a master.
420

421
  This is a single-node call.
422

423
  """
424
  c = Client("node_start_master", [])
425
  c.connect(node)
426
  c.run()
427
  return c.getresult().get(node, False)
428

    
429

    
430
def call_node_stop_master(node):
431
  """Tells a node to demote itself from master status.
432

433
  This is a single-node call.
434

435
  """
436
  c = Client("node_stop_master", [])
437
  c.connect(node)
438
  c.run()
439
  return c.getresult().get(node, False)
440

    
441

    
442
def call_version(node_list):
443
  """Query node version.
444

445
  This is a multi-node call.
446

447
  """
448
  c = Client("version", [])
449
  c.connect_list(node_list)
450
  c.run()
451
  return c.getresult()
452

    
453

    
454
def call_blockdev_create(node, bdev, size, on_primary):
455
  """Request creation of a given block device.
456

457
  This is a single-node call.
458

459
  """
460
  params = [bdev.Dumps(), size, on_primary]
461
  c = Client("blockdev_create", params)
462
  c.connect(node)
463
  c.run()
464
  return c.getresult().get(node, False)
465

    
466

    
467
def call_blockdev_remove(node, bdev):
468
  """Request removal of a given block device.
469

470
  This is a single-node call.
471

472
  """
473
  c = Client("blockdev_remove", [bdev.Dumps()])
474
  c.connect(node)
475
  c.run()
476
  return c.getresult().get(node, False)
477

    
478

    
479
def call_blockdev_assemble(node, disk, on_primary):
480
  """Request assembling of a given block device.
481

482
  This is a single-node call.
483

484
  """
485
  params = [disk.Dumps(), on_primary]
486
  c = Client("blockdev_assemble", params)
487
  c.connect(node)
488
  c.run()
489
  return c.getresult().get(node, False)
490

    
491

    
492
def call_blockdev_shutdown(node, disk):
493
  """Request shutdown of a given block device.
494

495
  This is a single-node call.
496

497
  """
498
  c = Client("blockdev_shutdown", [disk.Dumps()])
499
  c.connect(node)
500
  c.run()
501
  return c.getresult().get(node, False)
502

    
503

    
504
def call_blockdev_addchild(node, bdev, ndev):
505
  """Request adding a new child to a (mirroring) device.
506

507
  This is a single-node call.
508

509
  """
510
  params = [bdev.Dumps(), ndev.Dumps()]
511
  c = Client("blockdev_addchild", params)
512
  c.connect(node)
513
  c.run()
514
  return c.getresult().get(node, False)
515

    
516

    
517
def call_blockdev_removechild(node, bdev, ndev):
518
  """Request removing a new child from a (mirroring) device.
519

520
  This is a single-node call.
521

522
  """
523
  params = [bdev.Dumps(), ndev.Dumps()]
524
  c = Client("blockdev_removechild", params)
525
  c.connect(node)
526
  c.run()
527
  return c.getresult().get(node, False)
528

    
529

    
530
def call_blockdev_getmirrorstatus(node, disks):
531
  """Request status of a (mirroring) device.
532

533
  This is a single-node call.
534

535
  """
536
  params = [dsk.Dumps() for dsk in disks]
537
  c = Client("blockdev_getmirrorstatus", params)
538
  c.connect(node)
539
  c.run()
540
  return c.getresult().get(node, False)
541

    
542

    
543
def call_blockdev_find(node, disk):
544
  """Request identification of a given block device.
545

546
  This is a single-node call.
547

548
  """
549
  c = Client("blockdev_find", [disk.Dumps()])
550
  c.connect(node)
551
  c.run()
552
  return c.getresult().get(node, False)
553

    
554

    
555
def call_upload_file(node_list, file_name):
556
  """Upload a file.
557

558
  The node will refuse the operation in case the file is not on the
559
  approved file list.
560

561
  This is a multi-node call.
562

563
  """
564
  fh = file(file_name)
565
  try:
566
    data = fh.read()
567
  finally:
568
    fh.close()
569
  st = os.stat(file_name)
570
  params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
571
            st.st_atime, st.st_mtime]
572
  c = Client("upload_file", params)
573
  c.connect_list(node_list)
574
  c.run()
575
  return c.getresult()
576

    
577

    
578
def call_os_diagnose(node_list):
579
  """Request a diagnose of OS definitions.
580

581
  This is a multi-node call.
582

583
  """
584
  c = Client("os_diagnose", [])
585
  c.connect_list(node_list)
586
  c.run()
587
  result = c.getresult()
588
  new_result = {}
589
  for node_name in result:
590
    nr = []
591
    if result[node_name]:
592
      for data in result[node_name]:
593
        if data:
594
          if isinstance(data, basestring):
595
            nr.append(objects.ConfigObject.Loads(data))
596
          elif isinstance(data, tuple) and len(data) == 2:
597
            nr.append(errors.InvalidOS(data[0], data[1]))
598
          else:
599
            raise errors.ProgrammerError, ("Invalid data from"
600
                                           " xcserver.os_diagnose")
601
    new_result[node_name] = nr
602
  return new_result
603

    
604

    
605
def call_os_get(node_list, name):
606
  """Returns an OS definition.
607

608
  This is a multi-node call.
609

610
  """
611
  c = Client("os_get", [name])
612
  c.connect_list(node_list)
613
  c.run()
614
  result = c.getresult()
615
  new_result = {}
616
  for node_name in result:
617
    data = result[node_name]
618
    if isinstance(data, basestring):
619
      new_result[node_name] = objects.ConfigObject.Loads(data)
620
    elif isinstance(data, tuple) and len(data) == 2:
621
      new_result[node_name] = errors.InvalidOS(data[0], data[1])
622
    else:
623
      new_result[node_name] = data
624
  return new_result
625

    
626

    
627
def call_hooks_runner(node_list, hpath, phase, env):
628
  """Call the hooks runner.
629

630
  Args:
631
    - op: the OpCode instance
632
    - env: a dictionary with the environment
633

634
  This is a multi-node call.
635

636
  """
637
  params = [hpath, phase, env]
638
  c = Client("hooks_runner", params)
639
  c.connect_list(node_list)
640
  c.run()
641
  result = c.getresult()
642
  return result
643

    
644

    
645
def call_blockdev_snapshot(node, cf_bdev):
646
  """Request a snapshot of the given block device.
647

648
  This is a single-node call.
649

650
  """
651
  c = Client("blockdev_snapshot", [cf_bdev.Dumps()])
652
  c.connect(node)
653
  c.run()
654
  return c.getresult().get(node, False)
655

    
656

    
657
def call_snapshot_export(node, snap_bdev, dest_node, instance):
658
  """Request the export of a given snapshot.
659

660
  This is a single-node call.
661

662
  """
663
  params = [snap_bdev.Dumps(), dest_node, instance.Dumps()]
664
  c = Client("snapshot_export", params)
665
  c.connect(node)
666
  c.run()
667
  return c.getresult().get(node, False)
668

    
669

    
670
def call_finalize_export(node, instance, snap_disks):
671
  """Request the completion of an export operation.
672

673
  This writes the export config file, etc.
674

675
  This is a single-node call.
676

677
  """
678
  flat_disks = []
679
  for disk in snap_disks:
680
    flat_disks.append(disk.Dumps())
681
  params = [instance.Dumps(), flat_disks]
682
  c = Client("finalize_export", params)
683
  c.connect(node)
684
  c.run()
685
  return c.getresult().get(node, False)
686

    
687

    
688
def call_export_info(node, path):
689
  """Queries the export information in a given path.
690

691
  This is a single-node call.
692

693
  """
694
  c = Client("export_info", [path])
695
  c.connect(node)
696
  c.run()
697
  result = c.getresult().get(node, False)
698
  if not result:
699
    return result
700
  return objects.SerializableConfigParser.Loads(result)
701

    
702

    
703
def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
704
  """Request the import of a backup into an instance.
705

706
  This is a single-node call.
707

708
  """
709
  params = [inst.Dumps(), osdev, swapdev, src_node, src_image]
710
  c = Client("instance_os_import", params)
711
  c.connect(node)
712
  c.run()
713
  return c.getresult().get(node, False)
714

    
715

    
716
def call_export_list(node_list):
717
  """Gets the stored exports list.
718

719
  This is a multi-node call.
720

721
  """
722
  c = Client("export_list", [])
723
  c.connect_list(node_list)
724
  c.run()
725
  result = c.getresult()
726
  return result
727

    
728

    
729
def call_export_remove(node, export):
730
  """Requests removal of a given export.
731

732
  This is a single-node call.
733

734
  """
735
  c = Client("export_remove", [export])
736
  c.connect(node)
737
  c.run()
738
  return c.getresult().get(node, False)
739

    
740

    
741
def call_node_leave_cluster(node):
742
  """Requests a node to clean the cluster information it has.
743

744
  This will remove the configuration information from the ganeti data
745
  dir.
746

747
  This is a single-node call.
748

749
  """
750
  c = Client("node_leave_cluster", [])
751
  c.connect(node)
752
  c.run()
753
  return c.getresult().get(node, False)
754

    
755

    
756
def call_node_volumes(node_list):
757
  """Gets all volumes on node(s).
758

759
  This is a multi-node call.
760

761
  """
762
  c = Client("node_volumes", [])
763
  c.connect_list(node_list)
764
  c.run()
765
  return c.getresult()