Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ a0c3fea1

History | View | Annotate | Download (18.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script to show add a new node to the cluster
23

24
"""
25

    
26
# pylint: disable-msg=C0103
27

    
28
import os
29

    
30
from twisted.internet.pollreactor import PollReactor
31

    
32
class ReReactor(PollReactor):
33
  """A re-startable Reactor implementation.
34

35
  """
36
  def run(self, installSignalHandlers=1):
37
    """Custom run method.
38

39
    This is customized run that, before calling Reactor.run, will
40
    reinstall the shutdown events and re-create the threadpool in case
41
    these are not present (as will happen on the second run of the
42
    reactor).
43

44
    """
45
    if not 'shutdown' in self._eventTriggers:
46
      # the shutdown queue has been killed, we are most probably
47
      # at the second run, thus recreate the queue
48
      self.addSystemEventTrigger('during', 'shutdown', self.crash)
49
      self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
50
    if self.threadpool is not None and self.threadpool.joined == 1:
51
      # in case the threadpool has been stopped, re-start it
52
      # and add a trigger to stop it at reactor shutdown
53
      self.threadpool.start()
54
      self.addSystemEventTrigger('during', 'shutdown', self.threadpool.stop)
55

    
56
    return PollReactor.run(self, installSignalHandlers)
57

    
58

    
59
import twisted.internet.main
60
twisted.internet.main.installReactor(ReReactor())
61

    
62
from twisted.spread import pb
63
from twisted.internet import reactor
64
from twisted.cred import credentials
65
from OpenSSL import SSL, crypto
66

    
67
from ganeti import logger
68
from ganeti import utils
69
from ganeti import errors
70
from ganeti import constants
71
from ganeti import objects
72
from ganeti import ssconf
73

    
74
class NodeController:
75
  """Node-handling class.
76

77
  For each node that we speak with, we create an instance of this
78
  class, so that we have a safe place to store the details of this
79
  individual call.
80

81
  """
82
  def __init__(self, parent, node):
83
    self.parent = parent
84
    self.node = node
85

    
86
  def _check_end(self):
87
    """Stop the reactor if we got all the results.
88

89
    """
90
    if len(self.parent.results) == len(self.parent.nc):
91
      reactor.stop()
92

    
93
  def cb_call(self, obj):
94
    """Callback for successfull connect.
95

96
    If the connect and login sequence succeeded, we proceed with
97
    making the actual call.
98

99
    """
100
    deferred = obj.callRemote(self.parent.procedure, self.parent.args)
101
    deferred.addCallbacks(self.cb_done, self.cb_err2)
102

    
103
  def cb_done(self, result):
104
    """Callback for successful call.
105

106
    When we receive the result from a call, we check if it was an
107
    error and if so we raise a generic RemoteError (we can't pass yet
108
    the actual exception over). If there was no error, we store the
109
    result.
110

111
    """
112
    tb, self.parent.results[self.node] = result
113
    self._check_end()
114
    if tb:
115
      raise errors.RemoteError("Remote procedure error calling %s on %s:"
116
                               "\n%s" % (self.parent.procedure,
117
                                         self.node,
118
                                         tb))
119

    
120
  def cb_err1(self, reason):
121
    """Error callback for unsuccessful connect.
122

123
    """
124
    logger.Error("caller_connect: could not connect to remote host %s,"
125
                 " reason %s" % (self.node, reason))
126
    self.parent.results[self.node] = False
127
    self._check_end()
128

    
129
  def cb_err2(self, reason):
130
    """Error callback for unsuccessful call.
131

132
    This is when the call didn't return anything, not even an error,
133
    or when it time out, etc.
134

135
    """
136
    logger.Error("caller_call: could not call %s on node %s,"
137
                 " reason %s" % (self.parent.procedure, self.node, reason))
138
    self.parent.results[self.node] = False
139
    self._check_end()
140

    
141

    
142
class MirrorContextFactory:
143
  """Certificate verifier factory.
144

145
  This factory creates contexts that verify if the remote end has a
146
  specific certificate (i.e. our own certificate).
147

148
  The checks we do are that the PEM dump of the certificate is the
149
  same as our own and (somewhat redundantly) that the SHA checksum is
150
  the same.
151

152
  """
153
  isClient = 1
154

    
155
  def __init__(self):
156
    try:
157
      fd = open(constants.SSL_CERT_FILE, 'r')
158
      try:
159
        data = fd.read(16384)
160
      finally:
161
        fd.close()
162
    except EnvironmentError, err:
163
      raise errors.ConfigurationError, ("missing SSL certificate: %s" %
164
                                        str(err))
165
    self.mycert = crypto.load_certificate(crypto.FILETYPE_PEM, data)
166
    self.mypem = crypto.dump_certificate(crypto.FILETYPE_PEM, self.mycert)
167
    self.mydigest = self.mycert.digest('SHA')
168

    
169
  def verifier(self, conn, x509, errno, err_depth, retcode):
170
    """Certificate verify method.
171

172
    """
173
    if self.mydigest != x509.digest('SHA'):
174
      return False
175
    if crypto.dump_certificate(crypto.FILETYPE_PEM, x509) != self.mypem:
176
      return False
177
    return True
178

    
179
  def getContext(self):
180
    """Context generator.
181

182
    """
183
    context = SSL.Context(SSL.TLSv1_METHOD)
184
    context.set_verify(SSL.VERIFY_PEER, self.verifier)
185
    return context
186

    
187
class Client:
188
  """RPC Client class.
189

190
  This class, given a (remote) ethod name, a list of parameters and a
191
  list of nodes, will contact (in parallel) all nodes, and return a
192
  dict of results (key: node name, value: result).
193

194
  One current bug is that generic failure is still signalled by
195
  'False' result, which is not good. This overloading of values can
196
  cause bugs.
197

198
  """
199
  result_set = False
200
  result = False
201
  allresult = []
202

    
203
  def __init__(self, procedure, args):
204
    ss = ssconf.SimpleStore()
205
    self.port = ss.GetNodeDaemonPort()
206
    self.nodepw = ss.GetNodeDaemonPassword()
207
    self.nc = {}
208
    self.results = {}
209
    self.procedure = procedure
210
    self.args = args
211

    
212
  #--- generic connector -------------
213

    
214
  def connect_list(self, node_list):
215
    """Add a list of nodes to the target nodes.
216

217
    """
218
    for node in node_list:
219
      self.connect(node)
220

    
221
  def connect(self, connect_node):
222
    """Add a node to the target list.
223

224
    """
225
    factory = pb.PBClientFactory()
226
    self.nc[connect_node] = nc = NodeController(self, connect_node)
227
    reactor.connectSSL(connect_node, self.port, factory,
228
                       MirrorContextFactory())
229
    #d = factory.getRootObject()
230
    d = factory.login(credentials.UsernamePassword("master_node", self.nodepw))
231
    d.addCallbacks(nc.cb_call, nc.cb_err1)
232

    
233
  def getresult(self):
234
    """Return the results of the call.
235

236
    """
237
    return self.results
238

    
239
  def run(self):
240
    """Wrapper over reactor.run().
241

242
    This function simply calls reactor.run() if we have any requests
243
    queued, otherwise it does nothing.
244

245
    """
246
    if self.nc:
247
      reactor.run()
248

    
249

    
250
def call_volume_list(node_list, vg_name):
251
  """Gets the logical volumes present in a given volume group.
252

253
  This is a multi-node call.
254

255
  """
256
  c = Client("volume_list", [vg_name])
257
  c.connect_list(node_list)
258
  c.run()
259
  return c.getresult()
260

    
261

    
262
def call_vg_list(node_list):
263
  """Gets the volume group list.
264

265
  This is a multi-node call.
266

267
  """
268
  c = Client("vg_list", [])
269
  c.connect_list(node_list)
270
  c.run()
271
  return c.getresult()
272

    
273

    
274
def call_bridges_exist(node, bridges_list):
275
  """Checks if a node has all the bridges given.
276

277
  This method checks if all bridges given in the bridges_list are
278
  present on the remote node, so that an instance that uses interfaces
279
  on those bridges can be started.
280

281
  This is a single-node call.
282

283
  """
284
  c = Client("bridges_exist", [bridges_list])
285
  c.connect(node)
286
  c.run()
287
  return c.getresult().get(node, False)
288

    
289

    
290
def call_instance_start(node, instance, extra_args):
291
  """Stars an instance.
292

293
  This is a single-node call.
294

295
  """
296
  c = Client("instance_start", [instance.Dumps(), extra_args])
297
  c.connect(node)
298
  c.run()
299
  return c.getresult().get(node, False)
300

    
301

    
302
def call_instance_shutdown(node, instance):
303
  """Stops an instance.
304

305
  This is a single-node call.
306

307
  """
308
  c = Client("instance_shutdown", [instance.Dumps()])
309
  c.connect(node)
310
  c.run()
311
  return c.getresult().get(node, False)
312

    
313

    
314
def call_instance_os_add(node, inst, osdev, swapdev):
315
  """Installs an OS on the given instance.
316

317
  This is a single-node call.
318

319
  """
320
  params = [inst.Dumps(), osdev, swapdev]
321
  c = Client("instance_os_add", params)
322
  c.connect(node)
323
  c.run()
324
  return c.getresult().get(node, False)
325

    
326

    
327
def call_instance_info(node, instance):
328
  """Returns information about a single instance.
329

330
  This is a single-node call.
331

332
  """
333
  c = Client("instance_info", [instance])
334
  c.connect(node)
335
  c.run()
336
  return c.getresult().get(node, False)
337

    
338

    
339
def call_all_instances_info(node_list):
340
  """Returns information about all instances on a given node.
341

342
  This is a single-node call.
343

344
  """
345
  c = Client("all_instances_info", [])
346
  c.connect_list(node_list)
347
  c.run()
348
  return c.getresult()
349

    
350

    
351
def call_instance_list(node_list):
352
  """Returns the list of running instances on a given node.
353

354
  This is a single-node call.
355

356
  """
357
  c = Client("instance_list", [])
358
  c.connect_list(node_list)
359
  c.run()
360
  return c.getresult()
361

    
362

    
363
def call_node_info(node_list, vg_name):
364
  """Return node information.
365

366
  This will return memory information and volume group size and free
367
  space.
368

369
  This is a multi-node call.
370

371
  """
372
  c = Client("node_info", [vg_name])
373
  c.connect_list(node_list)
374
  c.run()
375
  retux = c.getresult()
376

    
377
  for node_name in retux:
378
    ret = retux.get(node_name, False)
379
    if type(ret) != dict:
380
      logger.Error("could not connect to node %s" % (node_name))
381
      ret = {}
382

    
383
    utils.CheckDict(ret,
384
                    { 'memory_total' : '-',
385
                      'memory_dom0' : '-',
386
                      'memory_free' : '-',
387
                      'vg_size' : 'node_unreachable',
388
                      'vg_free' : '-' },
389
                    "call_node_info",
390
                    )
391
  return retux
392

    
393

    
394
def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
395
  """Add a node to the cluster.
396

397
  This is a single-node call.
398

399
  """
400
  params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
401
  c = Client("node_add", params)
402
  c.connect(node)
403
  c.run()
404
  return c.getresult().get(node, False)
405

    
406

    
407
def call_node_verify(node_list, checkdict):
408
  """Request verification of given parameters.
409

410
  This is a multi-node call.
411

412
  """
413
  c = Client("node_verify", [checkdict])
414
  c.connect_list(node_list)
415
  c.run()
416
  return c.getresult()
417

    
418

    
419
def call_node_start_master(node):
420
  """Tells a node to activate itself as a master.
421

422
  This is a single-node call.
423

424
  """
425
  c = Client("node_start_master", [])
426
  c.connect(node)
427
  c.run()
428
  return c.getresult().get(node, False)
429

    
430

    
431
def call_node_stop_master(node):
432
  """Tells a node to demote itself from master status.
433

434
  This is a single-node call.
435

436
  """
437
  c = Client("node_stop_master", [])
438
  c.connect(node)
439
  c.run()
440
  return c.getresult().get(node, False)
441

    
442

    
443
def call_version(node_list):
444
  """Query node version.
445

446
  This is a multi-node call.
447

448
  """
449
  c = Client("version", [])
450
  c.connect_list(node_list)
451
  c.run()
452
  return c.getresult()
453

    
454

    
455
def call_blockdev_create(node, bdev, size, on_primary, info):
456
  """Request creation of a given block device.
457

458
  This is a single-node call.
459

460
  """
461
  params = [bdev.Dumps(), size, on_primary, info]
462
  c = Client("blockdev_create", params)
463
  c.connect(node)
464
  c.run()
465
  return c.getresult().get(node, False)
466

    
467

    
468
def call_blockdev_remove(node, bdev):
469
  """Request removal of a given block device.
470

471
  This is a single-node call.
472

473
  """
474
  c = Client("blockdev_remove", [bdev.Dumps()])
475
  c.connect(node)
476
  c.run()
477
  return c.getresult().get(node, False)
478

    
479

    
480
def call_blockdev_assemble(node, disk, on_primary):
481
  """Request assembling of a given block device.
482

483
  This is a single-node call.
484

485
  """
486
  params = [disk.Dumps(), on_primary]
487
  c = Client("blockdev_assemble", params)
488
  c.connect(node)
489
  c.run()
490
  return c.getresult().get(node, False)
491

    
492

    
493
def call_blockdev_shutdown(node, disk):
494
  """Request shutdown of a given block device.
495

496
  This is a single-node call.
497

498
  """
499
  c = Client("blockdev_shutdown", [disk.Dumps()])
500
  c.connect(node)
501
  c.run()
502
  return c.getresult().get(node, False)
503

    
504

    
505
def call_blockdev_addchild(node, bdev, ndev):
506
  """Request adding a new child to a (mirroring) device.
507

508
  This is a single-node call.
509

510
  """
511
  params = [bdev.Dumps(), ndev.Dumps()]
512
  c = Client("blockdev_addchild", params)
513
  c.connect(node)
514
  c.run()
515
  return c.getresult().get(node, False)
516

    
517

    
518
def call_blockdev_removechild(node, bdev, ndev):
519
  """Request removing a new child from a (mirroring) device.
520

521
  This is a single-node call.
522

523
  """
524
  params = [bdev.Dumps(), ndev.Dumps()]
525
  c = Client("blockdev_removechild", params)
526
  c.connect(node)
527
  c.run()
528
  return c.getresult().get(node, False)
529

    
530

    
531
def call_blockdev_getmirrorstatus(node, disks):
532
  """Request status of a (mirroring) device.
533

534
  This is a single-node call.
535

536
  """
537
  params = [dsk.Dumps() for dsk in disks]
538
  c = Client("blockdev_getmirrorstatus", params)
539
  c.connect(node)
540
  c.run()
541
  return c.getresult().get(node, False)
542

    
543

    
544
def call_blockdev_find(node, disk):
545
  """Request identification of a given block device.
546

547
  This is a single-node call.
548

549
  """
550
  c = Client("blockdev_find", [disk.Dumps()])
551
  c.connect(node)
552
  c.run()
553
  return c.getresult().get(node, False)
554

    
555

    
556
def call_upload_file(node_list, file_name):
557
  """Upload a file.
558

559
  The node will refuse the operation in case the file is not on the
560
  approved file list.
561

562
  This is a multi-node call.
563

564
  """
565
  fh = file(file_name)
566
  try:
567
    data = fh.read()
568
  finally:
569
    fh.close()
570
  st = os.stat(file_name)
571
  params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
572
            st.st_atime, st.st_mtime]
573
  c = Client("upload_file", params)
574
  c.connect_list(node_list)
575
  c.run()
576
  return c.getresult()
577

    
578

    
579
def call_os_diagnose(node_list):
580
  """Request a diagnose of OS definitions.
581

582
  This is a multi-node call.
583

584
  """
585
  c = Client("os_diagnose", [])
586
  c.connect_list(node_list)
587
  c.run()
588
  result = c.getresult()
589
  new_result = {}
590
  for node_name in result:
591
    nr = []
592
    if result[node_name]:
593
      for data in result[node_name]:
594
        if data:
595
          if isinstance(data, basestring):
596
            nr.append(objects.ConfigObject.Loads(data))
597
          elif isinstance(data, tuple) and len(data) == 2:
598
            nr.append(errors.InvalidOS(data[0], data[1]))
599
          else:
600
            raise errors.ProgrammerError, ("Invalid data from"
601
                                           " xcserver.os_diagnose")
602
    new_result[node_name] = nr
603
  return new_result
604

    
605

    
606
def call_os_get(node_list, name):
607
  """Returns an OS definition.
608

609
  This is a multi-node call.
610

611
  """
612
  c = Client("os_get", [name])
613
  c.connect_list(node_list)
614
  c.run()
615
  result = c.getresult()
616
  new_result = {}
617
  for node_name in result:
618
    data = result[node_name]
619
    if isinstance(data, basestring):
620
      new_result[node_name] = objects.ConfigObject.Loads(data)
621
    elif isinstance(data, tuple) and len(data) == 2:
622
      new_result[node_name] = errors.InvalidOS(data[0], data[1])
623
    else:
624
      new_result[node_name] = data
625
  return new_result
626

    
627

    
628
def call_hooks_runner(node_list, hpath, phase, env):
629
  """Call the hooks runner.
630

631
  Args:
632
    - op: the OpCode instance
633
    - env: a dictionary with the environment
634

635
  This is a multi-node call.
636

637
  """
638
  params = [hpath, phase, env]
639
  c = Client("hooks_runner", params)
640
  c.connect_list(node_list)
641
  c.run()
642
  result = c.getresult()
643
  return result
644

    
645

    
646
def call_blockdev_snapshot(node, cf_bdev):
647
  """Request a snapshot of the given block device.
648

649
  This is a single-node call.
650

651
  """
652
  c = Client("blockdev_snapshot", [cf_bdev.Dumps()])
653
  c.connect(node)
654
  c.run()
655
  return c.getresult().get(node, False)
656

    
657

    
658
def call_snapshot_export(node, snap_bdev, dest_node, instance):
659
  """Request the export of a given snapshot.
660

661
  This is a single-node call.
662

663
  """
664
  params = [snap_bdev.Dumps(), dest_node, instance.Dumps()]
665
  c = Client("snapshot_export", params)
666
  c.connect(node)
667
  c.run()
668
  return c.getresult().get(node, False)
669

    
670

    
671
def call_finalize_export(node, instance, snap_disks):
672
  """Request the completion of an export operation.
673

674
  This writes the export config file, etc.
675

676
  This is a single-node call.
677

678
  """
679
  flat_disks = []
680
  for disk in snap_disks:
681
    flat_disks.append(disk.Dumps())
682
  params = [instance.Dumps(), flat_disks]
683
  c = Client("finalize_export", params)
684
  c.connect(node)
685
  c.run()
686
  return c.getresult().get(node, False)
687

    
688

    
689
def call_export_info(node, path):
690
  """Queries the export information in a given path.
691

692
  This is a single-node call.
693

694
  """
695
  c = Client("export_info", [path])
696
  c.connect(node)
697
  c.run()
698
  result = c.getresult().get(node, False)
699
  if not result:
700
    return result
701
  return objects.SerializableConfigParser.Loads(result)
702

    
703

    
704
def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
705
  """Request the import of a backup into an instance.
706

707
  This is a single-node call.
708

709
  """
710
  params = [inst.Dumps(), osdev, swapdev, src_node, src_image]
711
  c = Client("instance_os_import", params)
712
  c.connect(node)
713
  c.run()
714
  return c.getresult().get(node, False)
715

    
716

    
717
def call_export_list(node_list):
718
  """Gets the stored exports list.
719

720
  This is a multi-node call.
721

722
  """
723
  c = Client("export_list", [])
724
  c.connect_list(node_list)
725
  c.run()
726
  result = c.getresult()
727
  return result
728

    
729

    
730
def call_export_remove(node, export):
731
  """Requests removal of a given export.
732

733
  This is a single-node call.
734

735
  """
736
  c = Client("export_remove", [export])
737
  c.connect(node)
738
  c.run()
739
  return c.getresult().get(node, False)
740

    
741

    
742
def call_node_leave_cluster(node):
743
  """Requests a node to clean the cluster information it has.
744

745
  This will remove the configuration information from the ganeti data
746
  dir.
747

748
  This is a single-node call.
749

750
  """
751
  c = Client("node_leave_cluster", [])
752
  c.connect(node)
753
  c.run()
754
  return c.getresult().get(node, False)
755

    
756

    
757
def call_node_volumes(node_list):
758
  """Gets all volumes on node(s).
759

760
  This is a multi-node call.
761

762
  """
763
  c = Client("node_volumes", [])
764
  c.connect_list(node_list)
765
  c.run()
766
  return c.getresult()