Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ b9bddb6b

History | View | Annotate | Download (19.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script to show add a new node to the cluster
23

24
"""
25

    
26
# pylint: disable-msg=C0103
27

    
28
import os
29
import socket
30
import httplib
31

    
32
import simplejson
33

    
34
from ganeti import logger
35
from ganeti import utils
36
from ganeti import objects
37

    
38

    
39
class NodeController:
40
  """Node-handling class.
41

42
  For each node that we speak with, we create an instance of this
43
  class, so that we have a safe place to store the details of this
44
  individual call.
45

46
  """
47
  def __init__(self, parent, node):
48
    self.parent = parent
49
    self.node = node
50
    self.failed = False
51

    
52
    self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
53
    try:
54
      hc.connect()
55
      hc.putrequest('PUT', "/%s" % self.parent.procedure,
56
                    skip_accept_encoding=True)
57
      hc.putheader('Content-Length', str(len(parent.body)))
58
      hc.endheaders()
59
      hc.send(parent.body)
60
    except socket.error, err:
61
      logger.Error("Error connecting to %s: %s" % (node, str(err)))
62
      self.failed = True
63

    
64
  def get_response(self):
65
    """Try to process the response from the node.
66

67
    """
68
    if self.failed:
69
      # we already failed in connect
70
      return False
71
    resp = self.http_conn.getresponse()
72
    if resp.status != 200:
73
      return False
74
    try:
75
      length = int(resp.getheader('Content-Length', '0'))
76
    except ValueError:
77
      return False
78
    if not length:
79
      logger.Error("Zero-length reply from %s" % self.node)
80
      return False
81
    payload = resp.read(length)
82
    unload = simplejson.loads(payload)
83
    return unload
84

    
85

    
86
class Client:
87
  """RPC Client class.
88

89
  This class, given a (remote) method name, a list of parameters and a
90
  list of nodes, will contact (in parallel) all nodes, and return a
91
  dict of results (key: node name, value: result).
92

93
  One current bug is that generic failure is still signalled by
94
  'False' result, which is not good. This overloading of values can
95
  cause bugs.
96

97
  """
98
  result_set = False
99
  result = False
100
  allresult = []
101

    
102
  def __init__(self, procedure, args):
103
    self.port = utils.GetNodeDaemonPort()
104
    self.nodepw = utils.GetNodeDaemonPassword()
105
    self.nc = {}
106
    self.results = {}
107
    self.procedure = procedure
108
    self.args = args
109
    self.body = simplejson.dumps(args)
110

    
111
  #--- generic connector -------------
112

    
113
  def connect_list(self, node_list):
114
    """Add a list of nodes to the target nodes.
115

116
    """
117
    for node in node_list:
118
      self.connect(node)
119

    
120
  def connect(self, connect_node):
121
    """Add a node to the target list.
122

123
    """
124
    self.nc[connect_node] = nc = NodeController(self, connect_node)
125

    
126
  def getresult(self):
127
    """Return the results of the call.
128

129
    """
130
    return self.results
131

    
132
  def run(self):
133
    """Wrapper over reactor.run().
134

135
    This function simply calls reactor.run() if we have any requests
136
    queued, otherwise it does nothing.
137

138
    """
139
    for node, nc in self.nc.items():
140
      self.results[node] = nc.get_response()
141

    
142

    
143
def call_volume_list(node_list, vg_name):
144
  """Gets the logical volumes present in a given volume group.
145

146
  This is a multi-node call.
147

148
  """
149
  c = Client("volume_list", [vg_name])
150
  c.connect_list(node_list)
151
  c.run()
152
  return c.getresult()
153

    
154

    
155
def call_vg_list(node_list):
156
  """Gets the volume group list.
157

158
  This is a multi-node call.
159

160
  """
161
  c = Client("vg_list", [])
162
  c.connect_list(node_list)
163
  c.run()
164
  return c.getresult()
165

    
166

    
167
def call_bridges_exist(node, bridges_list):
168
  """Checks if a node has all the bridges given.
169

170
  This method checks if all bridges given in the bridges_list are
171
  present on the remote node, so that an instance that uses interfaces
172
  on those bridges can be started.
173

174
  This is a single-node call.
175

176
  """
177
  c = Client("bridges_exist", [bridges_list])
178
  c.connect(node)
179
  c.run()
180
  return c.getresult().get(node, False)
181

    
182

    
183
def call_instance_start(node, instance, extra_args):
184
  """Starts an instance.
185

186
  This is a single-node call.
187

188
  """
189
  c = Client("instance_start", [instance.ToDict(), extra_args])
190
  c.connect(node)
191
  c.run()
192
  return c.getresult().get(node, False)
193

    
194

    
195
def call_instance_shutdown(node, instance):
196
  """Stops an instance.
197

198
  This is a single-node call.
199

200
  """
201
  c = Client("instance_shutdown", [instance.ToDict()])
202
  c.connect(node)
203
  c.run()
204
  return c.getresult().get(node, False)
205

    
206

    
207
def call_instance_migrate(node, instance, target, live):
208
  """Migrate an instance.
209

210
  This is a single-node call.
211

212
  @type node: string
213
  @param node: the node on which the instance is currently running
214
  @type instance: C{objects.Instance}
215
  @param instance: the instance definition
216
  @type target: string
217
  @param target: the target node name
218
  @type live: boolean
219
  @param live: whether the migration should be done live or not (the
220
      interpretation of this parameter is left to the hypervisor)
221

222
  """
223
  c = Client("instance_migrate", [instance.ToDict(), target, live])
224
  c.connect(node)
225
  c.run()
226
  return c.getresult().get(node, False)
227

    
228

    
229
def call_instance_reboot(node, instance, reboot_type, extra_args):
230
  """Reboots an instance.
231

232
  This is a single-node call.
233

234
  """
235
  c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
236
  c.connect(node)
237
  c.run()
238
  return c.getresult().get(node, False)
239

    
240

    
241
def call_instance_os_add(node, inst, osdev, swapdev):
242
  """Installs an OS on the given instance.
243

244
  This is a single-node call.
245

246
  """
247
  params = [inst.ToDict(), osdev, swapdev]
248
  c = Client("instance_os_add", params)
249
  c.connect(node)
250
  c.run()
251
  return c.getresult().get(node, False)
252

    
253

    
254
def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
255
  """Run the OS rename script for an instance.
256

257
  This is a single-node call.
258

259
  """
260
  params = [inst.ToDict(), old_name, osdev, swapdev]
261
  c = Client("instance_run_rename", params)
262
  c.connect(node)
263
  c.run()
264
  return c.getresult().get(node, False)
265

    
266

    
267
def call_instance_info(node, instance, hname):
268
  """Returns information about a single instance.
269

270
  This is a single-node call.
271

272
  @type node_list: list
273
  @param node_list: the list of nodes to query
274
  @type instance: string
275
  @param instance: the instance name
276
  @type hname: string
277
  @param hname: the hypervisor type of the instance
278

279
  """
280
  c = Client("instance_info", [instance])
281
  c.connect(node)
282
  c.run()
283
  return c.getresult().get(node, False)
284

    
285

    
286
def call_all_instances_info(node_list, hypervisor_list):
287
  """Returns information about all instances on the given nodes.
288

289
  This is a multi-node call.
290

291
  @type node_list: list
292
  @param node_list: the list of nodes to query
293
  @type hypervisor_list: list
294
  @param hypervisor_list: the hypervisors to query for instances
295

296
  """
297
  c = Client("all_instances_info", [hypervisor_list])
298
  c.connect_list(node_list)
299
  c.run()
300
  return c.getresult()
301

    
302

    
303
def call_instance_list(node_list, hypervisor_list):
304
  """Returns the list of running instances on a given node.
305

306
  This is a multi-node call.
307

308
  @type node_list: list
309
  @param node_list: the list of nodes to query
310
  @type hypervisor_list: list
311
  @param hypervisor_list: the hypervisors to query for instances
312

313
  """
314
  c = Client("instance_list", [hypervisor_list])
315
  c.connect_list(node_list)
316
  c.run()
317
  return c.getresult()
318

    
319

    
320
def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
321
  """Do a TcpPing on the remote node
322

323
  This is a single-node call.
324
  """
325
  c = Client("node_tcp_ping", [source, target, port, timeout,
326
                               live_port_needed])
327
  c.connect(node)
328
  c.run()
329
  return c.getresult().get(node, False)
330

    
331

    
332
def call_node_info(node_list, vg_name, hypervisor_type):
333
  """Return node information.
334

335
  This will return memory information and volume group size and free
336
  space.
337

338
  This is a multi-node call.
339

340
  @type node_list: list
341
  @param node_list: the list of nodes to query
342
  @type vgname: C{string}
343
  @param vgname: the name of the volume group to ask for disk space information
344
  @type hypervisor_type: C{str}
345
  @param hypervisor_type: the name of the hypervisor to ask for
346
      memory information
347

348
  """
349
  c = Client("node_info", [vg_name, hypervisor_type])
350
  c.connect_list(node_list)
351
  c.run()
352
  retux = c.getresult()
353

    
354
  for node_name in retux:
355
    ret = retux.get(node_name, False)
356
    if type(ret) != dict:
357
      logger.Error("could not connect to node %s" % (node_name))
358
      ret = {}
359

    
360
    utils.CheckDict(ret,
361
                    { 'memory_total' : '-',
362
                      'memory_dom0' : '-',
363
                      'memory_free' : '-',
364
                      'vg_size' : 'node_unreachable',
365
                      'vg_free' : '-' },
366
                    "call_node_info",
367
                    )
368
  return retux
369

    
370

    
371
def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
372
  """Add a node to the cluster.
373

374
  This is a single-node call.
375

376
  """
377
  params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
378
  c = Client("node_add", params)
379
  c.connect(node)
380
  c.run()
381
  return c.getresult().get(node, False)
382

    
383

    
384
def call_node_verify(node_list, checkdict, cluster_name):
385
  """Request verification of given parameters.
386

387
  This is a multi-node call.
388

389
  """
390
  c = Client("node_verify", [checkdict, cluster_name])
391
  c.connect_list(node_list)
392
  c.run()
393
  return c.getresult()
394

    
395

    
396
def call_node_start_master(node, start_daemons):
397
  """Tells a node to activate itself as a master.
398

399
  This is a single-node call.
400

401
  """
402
  c = Client("node_start_master", [start_daemons])
403
  c.connect(node)
404
  c.run()
405
  return c.getresult().get(node, False)
406

    
407

    
408
def call_node_stop_master(node, stop_daemons):
409
  """Tells a node to demote itself from master status.
410

411
  This is a single-node call.
412

413
  """
414
  c = Client("node_stop_master", [stop_daemons])
415
  c.connect(node)
416
  c.run()
417
  return c.getresult().get(node, False)
418

    
419

    
420
def call_master_info(node_list):
421
  """Query master info.
422

423
  This is a multi-node call.
424

425
  """
426
  c = Client("master_info", [])
427
  c.connect_list(node_list)
428
  c.run()
429
  return c.getresult()
430

    
431

    
432
def call_version(node_list):
433
  """Query node version.
434

435
  This is a multi-node call.
436

437
  """
438
  c = Client("version", [])
439
  c.connect_list(node_list)
440
  c.run()
441
  return c.getresult()
442

    
443

    
444
def call_blockdev_create(node, bdev, size, owner, on_primary, info):
445
  """Request creation of a given block device.
446

447
  This is a single-node call.
448

449
  """
450
  params = [bdev.ToDict(), size, owner, on_primary, info]
451
  c = Client("blockdev_create", params)
452
  c.connect(node)
453
  c.run()
454
  return c.getresult().get(node, False)
455

    
456

    
457
def call_blockdev_remove(node, bdev):
458
  """Request removal of a given block device.
459

460
  This is a single-node call.
461

462
  """
463
  c = Client("blockdev_remove", [bdev.ToDict()])
464
  c.connect(node)
465
  c.run()
466
  return c.getresult().get(node, False)
467

    
468

    
469
def call_blockdev_rename(node, devlist):
470
  """Request rename of the given block devices.
471

472
  This is a single-node call.
473

474
  """
475
  params = [(d.ToDict(), uid) for d, uid in devlist]
476
  c = Client("blockdev_rename", params)
477
  c.connect(node)
478
  c.run()
479
  return c.getresult().get(node, False)
480

    
481

    
482
def call_blockdev_assemble(node, disk, owner, on_primary):
483
  """Request assembling of a given block device.
484

485
  This is a single-node call.
486

487
  """
488
  params = [disk.ToDict(), owner, on_primary]
489
  c = Client("blockdev_assemble", params)
490
  c.connect(node)
491
  c.run()
492
  return c.getresult().get(node, False)
493

    
494

    
495
def call_blockdev_shutdown(node, disk):
496
  """Request shutdown of a given block device.
497

498
  This is a single-node call.
499

500
  """
501
  c = Client("blockdev_shutdown", [disk.ToDict()])
502
  c.connect(node)
503
  c.run()
504
  return c.getresult().get(node, False)
505

    
506

    
507
def call_blockdev_addchildren(node, bdev, ndevs):
508
  """Request adding a list of children to a (mirroring) device.
509

510
  This is a single-node call.
511

512
  """
513
  params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
514
  c = Client("blockdev_addchildren", params)
515
  c.connect(node)
516
  c.run()
517
  return c.getresult().get(node, False)
518

    
519

    
520
def call_blockdev_removechildren(node, bdev, ndevs):
521
  """Request removing a list of children from a (mirroring) device.
522

523
  This is a single-node call.
524

525
  """
526
  params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
527
  c = Client("blockdev_removechildren", params)
528
  c.connect(node)
529
  c.run()
530
  return c.getresult().get(node, False)
531

    
532

    
533
def call_blockdev_getmirrorstatus(node, disks):
534
  """Request status of a (mirroring) device.
535

536
  This is a single-node call.
537

538
  """
539
  params = [dsk.ToDict() for dsk in disks]
540
  c = Client("blockdev_getmirrorstatus", params)
541
  c.connect(node)
542
  c.run()
543
  return c.getresult().get(node, False)
544

    
545

    
546
def call_blockdev_find(node, disk):
547
  """Request identification of a given block device.
548

549
  This is a single-node call.
550

551
  """
552
  c = Client("blockdev_find", [disk.ToDict()])
553
  c.connect(node)
554
  c.run()
555
  return c.getresult().get(node, False)
556

    
557

    
558
def call_blockdev_close(node, disks):
559
  """Closes the given block devices.
560

561
  This is a single-node call.
562

563
  """
564
  params = [cf.ToDict() for cf in disks]
565
  c = Client("blockdev_close", params)
566
  c.connect(node)
567
  c.run()
568
  return c.getresult().get(node, False)
569

    
570

    
571
def call_upload_file(node_list, file_name):
572
  """Upload a file.
573

574
  The node will refuse the operation in case the file is not on the
575
  approved file list.
576

577
  This is a multi-node call.
578

579
  """
580
  fh = file(file_name)
581
  try:
582
    data = fh.read()
583
  finally:
584
    fh.close()
585
  st = os.stat(file_name)
586
  params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
587
            st.st_atime, st.st_mtime]
588
  c = Client("upload_file", params)
589
  c.connect_list(node_list)
590
  c.run()
591
  return c.getresult()
592

    
593

    
594
def call_os_diagnose(node_list):
595
  """Request a diagnose of OS definitions.
596

597
  This is a multi-node call.
598

599
  """
600
  c = Client("os_diagnose", [])
601
  c.connect_list(node_list)
602
  c.run()
603
  result = c.getresult()
604
  new_result = {}
605
  for node_name in result:
606
    if result[node_name]:
607
      nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
608
    else:
609
      nr = []
610
    new_result[node_name] = nr
611
  return new_result
612

    
613

    
614
def call_os_get(node, name):
615
  """Returns an OS definition.
616

617
  This is a single-node call.
618

619
  """
620
  c = Client("os_get", [name])
621
  c.connect(node)
622
  c.run()
623
  result = c.getresult().get(node, False)
624
  if isinstance(result, dict):
625
    return objects.OS.FromDict(result)
626
  else:
627
    return result
628

    
629

    
630
def call_hooks_runner(node_list, hpath, phase, env):
631
  """Call the hooks runner.
632

633
  Args:
634
    - op: the OpCode instance
635
    - env: a dictionary with the environment
636

637
  This is a multi-node call.
638

639
  """
640
  params = [hpath, phase, env]
641
  c = Client("hooks_runner", params)
642
  c.connect_list(node_list)
643
  c.run()
644
  result = c.getresult()
645
  return result
646

    
647

    
648
def call_iallocator_runner(node, name, idata):
649
  """Call an iallocator on a remote node
650

651
  Args:
652
    - name: the iallocator name
653
    - input: the json-encoded input string
654

655
  This is a single-node call.
656

657
  """
658
  params = [name, idata]
659
  c = Client("iallocator_runner", params)
660
  c.connect(node)
661
  c.run()
662
  result = c.getresult().get(node, False)
663
  return result
664

    
665

    
666
def call_blockdev_grow(node, cf_bdev, amount):
667
  """Request a snapshot of the given block device.
668

669
  This is a single-node call.
670

671
  """
672
  c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
673
  c.connect(node)
674
  c.run()
675
  return c.getresult().get(node, False)
676

    
677

    
678
def call_blockdev_snapshot(node, cf_bdev):
679
  """Request a snapshot of the given block device.
680

681
  This is a single-node call.
682

683
  """
684
  c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
685
  c.connect(node)
686
  c.run()
687
  return c.getresult().get(node, False)
688

    
689

    
690
def call_snapshot_export(node, snap_bdev, dest_node, instance, cluster_name):
691
  """Request the export of a given snapshot.
692

693
  This is a single-node call.
694

695
  """
696
  params = [snap_bdev.ToDict(), dest_node, instance.ToDict(), cluster_name]
697
  c = Client("snapshot_export", params)
698
  c.connect(node)
699
  c.run()
700
  return c.getresult().get(node, False)
701

    
702

    
703
def call_finalize_export(node, instance, snap_disks):
704
  """Request the completion of an export operation.
705

706
  This writes the export config file, etc.
707

708
  This is a single-node call.
709

710
  """
711
  flat_disks = []
712
  for disk in snap_disks:
713
    flat_disks.append(disk.ToDict())
714
  params = [instance.ToDict(), flat_disks]
715
  c = Client("finalize_export", params)
716
  c.connect(node)
717
  c.run()
718
  return c.getresult().get(node, False)
719

    
720

    
721
def call_export_info(node, path):
722
  """Queries the export information in a given path.
723

724
  This is a single-node call.
725

726
  """
727
  c = Client("export_info", [path])
728
  c.connect(node)
729
  c.run()
730
  result = c.getresult().get(node, False)
731
  if not result:
732
    return result
733
  return objects.SerializableConfigParser.Loads(str(result))
734

    
735

    
736
def call_instance_os_import(node, inst, osdev, swapdev,
737
                            src_node, src_image, cluster_name):
738
  """Request the import of a backup into an instance.
739

740
  This is a single-node call.
741

742
  """
743
  params = [inst.ToDict(), osdev, swapdev, src_node, src_image, cluster_name]
744
  c = Client("instance_os_import", params)
745
  c.connect(node)
746
  c.run()
747
  return c.getresult().get(node, False)
748

    
749

    
750
def call_export_list(node_list):
751
  """Gets the stored exports list.
752

753
  This is a multi-node call.
754

755
  """
756
  c = Client("export_list", [])
757
  c.connect_list(node_list)
758
  c.run()
759
  result = c.getresult()
760
  return result
761

    
762

    
763
def call_export_remove(node, export):
764
  """Requests removal of a given export.
765

766
  This is a single-node call.
767

768
  """
769
  c = Client("export_remove", [export])
770
  c.connect(node)
771
  c.run()
772
  return c.getresult().get(node, False)
773

    
774

    
775
def call_node_leave_cluster(node):
776
  """Requests a node to clean the cluster information it has.
777

778
  This will remove the configuration information from the ganeti data
779
  dir.
780

781
  This is a single-node call.
782

783
  """
784
  c = Client("node_leave_cluster", [])
785
  c.connect(node)
786
  c.run()
787
  return c.getresult().get(node, False)
788

    
789

    
790
def call_node_volumes(node_list):
791
  """Gets all volumes on node(s).
792

793
  This is a multi-node call.
794

795
  """
796
  c = Client("node_volumes", [])
797
  c.connect_list(node_list)
798
  c.run()
799
  return c.getresult()
800

    
801

    
802
def call_test_delay(node_list, duration):
803
  """Sleep for a fixed time on given node(s).
804

805
  This is a multi-node call.
806

807
  """
808
  c = Client("test_delay", [duration])
809
  c.connect_list(node_list)
810
  c.run()
811
  return c.getresult()
812

    
813

    
814
def call_file_storage_dir_create(node, file_storage_dir):
815
  """Create the given file storage directory.
816

817
  This is a single-node call.
818

819
  """
820
  c = Client("file_storage_dir_create", [file_storage_dir])
821
  c.connect(node)
822
  c.run()
823
  return c.getresult().get(node, False)
824

    
825

    
826
def call_file_storage_dir_remove(node, file_storage_dir):
827
  """Remove the given file storage directory.
828

829
  This is a single-node call.
830

831
  """
832
  c = Client("file_storage_dir_remove", [file_storage_dir])
833
  c.connect(node)
834
  c.run()
835
  return c.getresult().get(node, False)
836

    
837

    
838
def call_file_storage_dir_rename(node, old_file_storage_dir,
839
                                 new_file_storage_dir):
840
  """Rename file storage directory.
841

842
  This is a single-node call.
843

844
  """
845
  c = Client("file_storage_dir_rename",
846
             [old_file_storage_dir, new_file_storage_dir])
847
  c.connect(node)
848
  c.run()
849
  return c.getresult().get(node, False)
850

    
851

    
852
def call_jobqueue_update(node_list, file_name, content):
853
  """Update job queue.
854

855
  This is a multi-node call.
856

857
  """
858
  c = Client("jobqueue_update", [file_name, content])
859
  c.connect_list(node_list)
860
  c.run()
861
  result = c.getresult()
862
  return result
863

    
864

    
865
def call_jobqueue_purge(node):
866
  """Purge job queue.
867

868
  This is a single-node call.
869

870
  """
871
  c = Client("jobqueue_purge", [])
872
  c.connect(node)
873
  c.run()
874
  return c.getresult().get(node, False)
875

    
876

    
877
def call_jobqueue_rename(node_list, old, new):
878
  """Rename a job queue file.
879

880
  This is a multi-node call.
881

882
  """
883
  c = Client("jobqueue_rename", [old, new])
884
  c.connect_list(node_list)
885
  c.run()
886
  result = c.getresult()
887
  return result