Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 62c9ec92

History | View | Annotate | Download (18.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script to show add a new node to the cluster
23

24
"""
25

    
26
# pylint: disable-msg=C0103
27

    
28
import os
29
import socket
30
import httplib
31

    
32
import simplejson
33

    
34
from ganeti import logger
35
from ganeti import utils
36
from ganeti import objects
37

    
38

    
39
class NodeController:
40
  """Node-handling class.
41

42
  For each node that we speak with, we create an instance of this
43
  class, so that we have a safe place to store the details of this
44
  individual call.
45

46
  """
47
  def __init__(self, parent, node):
48
    self.parent = parent
49
    self.node = node
50
    self.failed = False
51

    
52
    self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
53
    try:
54
      hc.connect()
55
      hc.putrequest('PUT', "/%s" % self.parent.procedure,
56
                    skip_accept_encoding=True)
57
      hc.putheader('Content-Length', str(len(parent.body)))
58
      hc.endheaders()
59
      hc.send(parent.body)
60
    except socket.error, err:
61
      logger.Error("Error connecting to %s: %s" % (node, str(err)))
62
      self.failed = True
63

    
64
  def get_response(self):
65
    """Try to process the response from the node.
66

67
    """
68
    if self.failed:
69
      # we already failed in connect
70
      return False
71
    resp = self.http_conn.getresponse()
72
    if resp.status != 200:
73
      return False
74
    try:
75
      length = int(resp.getheader('Content-Length', '0'))
76
    except ValueError:
77
      return False
78
    if not length:
79
      logger.Error("Zero-length reply from %s" % self.node)
80
      return False
81
    payload = resp.read(length)
82
    unload = simplejson.loads(payload)
83
    return unload
84

    
85

    
86
class Client:
87
  """RPC Client class.
88

89
  This class, given a (remote) method name, a list of parameters and a
90
  list of nodes, will contact (in parallel) all nodes, and return a
91
  dict of results (key: node name, value: result).
92

93
  One current bug is that generic failure is still signalled by
94
  'False' result, which is not good. This overloading of values can
95
  cause bugs.
96

97
  """
98
  result_set = False
99
  result = False
100
  allresult = []
101

    
102
  def __init__(self, procedure, args):
103
    self.port = utils.GetNodeDaemonPort()
104
    self.nodepw = utils.GetNodeDaemonPassword()
105
    self.nc = {}
106
    self.results = {}
107
    self.procedure = procedure
108
    self.args = args
109
    self.body = simplejson.dumps(args)
110

    
111
  #--- generic connector -------------
112

    
113
  def connect_list(self, node_list):
114
    """Add a list of nodes to the target nodes.
115

116
    """
117
    for node in node_list:
118
      self.connect(node)
119

    
120
  def connect(self, connect_node):
121
    """Add a node to the target list.
122

123
    """
124
    self.nc[connect_node] = nc = NodeController(self, connect_node)
125

    
126
  def getresult(self):
127
    """Return the results of the call.
128

129
    """
130
    return self.results
131

    
132
  def run(self):
133
    """Wrapper over reactor.run().
134

135
    This function simply calls reactor.run() if we have any requests
136
    queued, otherwise it does nothing.
137

138
    """
139
    for node, nc in self.nc.items():
140
      self.results[node] = nc.get_response()
141

    
142

    
143
def call_volume_list(node_list, vg_name):
144
  """Gets the logical volumes present in a given volume group.
145

146
  This is a multi-node call.
147

148
  """
149
  c = Client("volume_list", [vg_name])
150
  c.connect_list(node_list)
151
  c.run()
152
  return c.getresult()
153

    
154

    
155
def call_vg_list(node_list):
156
  """Gets the volume group list.
157

158
  This is a multi-node call.
159

160
  """
161
  c = Client("vg_list", [])
162
  c.connect_list(node_list)
163
  c.run()
164
  return c.getresult()
165

    
166

    
167
def call_bridges_exist(node, bridges_list):
168
  """Checks if a node has all the bridges given.
169

170
  This method checks if all bridges given in the bridges_list are
171
  present on the remote node, so that an instance that uses interfaces
172
  on those bridges can be started.
173

174
  This is a single-node call.
175

176
  """
177
  c = Client("bridges_exist", [bridges_list])
178
  c.connect(node)
179
  c.run()
180
  return c.getresult().get(node, False)
181

    
182

    
183
def call_instance_start(node, instance, extra_args):
184
  """Starts an instance.
185

186
  This is a single-node call.
187

188
  """
189
  c = Client("instance_start", [instance.ToDict(), extra_args])
190
  c.connect(node)
191
  c.run()
192
  return c.getresult().get(node, False)
193

    
194

    
195
def call_instance_shutdown(node, instance):
196
  """Stops an instance.
197

198
  This is a single-node call.
199

200
  """
201
  c = Client("instance_shutdown", [instance.ToDict()])
202
  c.connect(node)
203
  c.run()
204
  return c.getresult().get(node, False)
205

    
206

    
207
def call_instance_migrate(node, instance, target, live):
208
  """Migrate an instance.
209

210
  This is a single-node call.
211

212
  """
213
  c = Client("instance_migrate", [instance.name, target, live])
214
  c.connect(node)
215
  c.run()
216
  return c.getresult().get(node, False)
217

    
218

    
219
def call_instance_reboot(node, instance, reboot_type, extra_args):
220
  """Reboots an instance.
221

222
  This is a single-node call.
223

224
  """
225
  c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
226
  c.connect(node)
227
  c.run()
228
  return c.getresult().get(node, False)
229

    
230

    
231
def call_instance_os_add(node, inst, osdev, swapdev):
232
  """Installs an OS on the given instance.
233

234
  This is a single-node call.
235

236
  """
237
  params = [inst.ToDict(), osdev, swapdev]
238
  c = Client("instance_os_add", params)
239
  c.connect(node)
240
  c.run()
241
  return c.getresult().get(node, False)
242

    
243

    
244
def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
245
  """Run the OS rename script for an instance.
246

247
  This is a single-node call.
248

249
  """
250
  params = [inst.ToDict(), old_name, osdev, swapdev]
251
  c = Client("instance_run_rename", params)
252
  c.connect(node)
253
  c.run()
254
  return c.getresult().get(node, False)
255

    
256

    
257
def call_instance_info(node, instance):
258
  """Returns information about a single instance.
259

260
  This is a single-node call.
261

262
  """
263
  c = Client("instance_info", [instance])
264
  c.connect(node)
265
  c.run()
266
  return c.getresult().get(node, False)
267

    
268

    
269
def call_all_instances_info(node_list):
270
  """Returns information about all instances on a given node.
271

272
  This is a single-node call.
273

274
  """
275
  c = Client("all_instances_info", [])
276
  c.connect_list(node_list)
277
  c.run()
278
  return c.getresult()
279

    
280

    
281
def call_instance_list(node_list):
282
  """Returns the list of running instances on a given node.
283

284
  This is a single-node call.
285

286
  """
287
  c = Client("instance_list", [])
288
  c.connect_list(node_list)
289
  c.run()
290
  return c.getresult()
291

    
292

    
293
def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
294
  """Do a TcpPing on the remote node
295

296
  This is a single-node call.
297
  """
298
  c = Client("node_tcp_ping", [source, target, port, timeout,
299
                               live_port_needed])
300
  c.connect(node)
301
  c.run()
302
  return c.getresult().get(node, False)
303

    
304

    
305
def call_node_info(node_list, vg_name):
306
  """Return node information.
307

308
  This will return memory information and volume group size and free
309
  space.
310

311
  This is a multi-node call.
312

313
  """
314
  c = Client("node_info", [vg_name])
315
  c.connect_list(node_list)
316
  c.run()
317
  retux = c.getresult()
318

    
319
  for node_name in retux:
320
    ret = retux.get(node_name, False)
321
    if type(ret) != dict:
322
      logger.Error("could not connect to node %s" % (node_name))
323
      ret = {}
324

    
325
    utils.CheckDict(ret,
326
                    { 'memory_total' : '-',
327
                      'memory_dom0' : '-',
328
                      'memory_free' : '-',
329
                      'vg_size' : 'node_unreachable',
330
                      'vg_free' : '-' },
331
                    "call_node_info",
332
                    )
333
  return retux
334

    
335

    
336
def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
337
  """Add a node to the cluster.
338

339
  This is a single-node call.
340

341
  """
342
  params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
343
  c = Client("node_add", params)
344
  c.connect(node)
345
  c.run()
346
  return c.getresult().get(node, False)
347

    
348

    
349
def call_node_verify(node_list, checkdict, cluster_name):
350
  """Request verification of given parameters.
351

352
  This is a multi-node call.
353

354
  """
355
  c = Client("node_verify", [checkdict, cluster_name])
356
  c.connect_list(node_list)
357
  c.run()
358
  return c.getresult()
359

    
360

    
361
def call_node_start_master(node, start_daemons):
362
  """Tells a node to activate itself as a master.
363

364
  This is a single-node call.
365

366
  """
367
  c = Client("node_start_master", [start_daemons])
368
  c.connect(node)
369
  c.run()
370
  return c.getresult().get(node, False)
371

    
372

    
373
def call_node_stop_master(node, stop_daemons):
374
  """Tells a node to demote itself from master status.
375

376
  This is a single-node call.
377

378
  """
379
  c = Client("node_stop_master", [stop_daemons])
380
  c.connect(node)
381
  c.run()
382
  return c.getresult().get(node, False)
383

    
384

    
385
def call_master_info(node_list):
386
  """Query master info.
387

388
  This is a multi-node call.
389

390
  """
391
  c = Client("master_info", [])
392
  c.connect_list(node_list)
393
  c.run()
394
  return c.getresult()
395

    
396

    
397
def call_version(node_list):
398
  """Query node version.
399

400
  This is a multi-node call.
401

402
  """
403
  c = Client("version", [])
404
  c.connect_list(node_list)
405
  c.run()
406
  return c.getresult()
407

    
408

    
409
def call_blockdev_create(node, bdev, size, owner, on_primary, info):
410
  """Request creation of a given block device.
411

412
  This is a single-node call.
413

414
  """
415
  params = [bdev.ToDict(), size, owner, on_primary, info]
416
  c = Client("blockdev_create", params)
417
  c.connect(node)
418
  c.run()
419
  return c.getresult().get(node, False)
420

    
421

    
422
def call_blockdev_remove(node, bdev):
423
  """Request removal of a given block device.
424

425
  This is a single-node call.
426

427
  """
428
  c = Client("blockdev_remove", [bdev.ToDict()])
429
  c.connect(node)
430
  c.run()
431
  return c.getresult().get(node, False)
432

    
433

    
434
def call_blockdev_rename(node, devlist):
435
  """Request rename of the given block devices.
436

437
  This is a single-node call.
438

439
  """
440
  params = [(d.ToDict(), uid) for d, uid in devlist]
441
  c = Client("blockdev_rename", params)
442
  c.connect(node)
443
  c.run()
444
  return c.getresult().get(node, False)
445

    
446

    
447
def call_blockdev_assemble(node, disk, owner, on_primary):
448
  """Request assembling of a given block device.
449

450
  This is a single-node call.
451

452
  """
453
  params = [disk.ToDict(), owner, on_primary]
454
  c = Client("blockdev_assemble", params)
455
  c.connect(node)
456
  c.run()
457
  return c.getresult().get(node, False)
458

    
459

    
460
def call_blockdev_shutdown(node, disk):
461
  """Request shutdown of a given block device.
462

463
  This is a single-node call.
464

465
  """
466
  c = Client("blockdev_shutdown", [disk.ToDict()])
467
  c.connect(node)
468
  c.run()
469
  return c.getresult().get(node, False)
470

    
471

    
472
def call_blockdev_addchildren(node, bdev, ndevs):
473
  """Request adding a list of children to a (mirroring) device.
474

475
  This is a single-node call.
476

477
  """
478
  params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
479
  c = Client("blockdev_addchildren", params)
480
  c.connect(node)
481
  c.run()
482
  return c.getresult().get(node, False)
483

    
484

    
485
def call_blockdev_removechildren(node, bdev, ndevs):
486
  """Request removing a list of children from a (mirroring) device.
487

488
  This is a single-node call.
489

490
  """
491
  params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
492
  c = Client("blockdev_removechildren", params)
493
  c.connect(node)
494
  c.run()
495
  return c.getresult().get(node, False)
496

    
497

    
498
def call_blockdev_getmirrorstatus(node, disks):
499
  """Request status of a (mirroring) device.
500

501
  This is a single-node call.
502

503
  """
504
  params = [dsk.ToDict() for dsk in disks]
505
  c = Client("blockdev_getmirrorstatus", params)
506
  c.connect(node)
507
  c.run()
508
  return c.getresult().get(node, False)
509

    
510

    
511
def call_blockdev_find(node, disk):
512
  """Request identification of a given block device.
513

514
  This is a single-node call.
515

516
  """
517
  c = Client("blockdev_find", [disk.ToDict()])
518
  c.connect(node)
519
  c.run()
520
  return c.getresult().get(node, False)
521

    
522

    
523
def call_blockdev_close(node, disks):
524
  """Closes the given block devices.
525

526
  This is a single-node call.
527

528
  """
529
  params = [cf.ToDict() for cf in disks]
530
  c = Client("blockdev_close", params)
531
  c.connect(node)
532
  c.run()
533
  return c.getresult().get(node, False)
534

    
535

    
536
def call_upload_file(node_list, file_name):
537
  """Upload a file.
538

539
  The node will refuse the operation in case the file is not on the
540
  approved file list.
541

542
  This is a multi-node call.
543

544
  """
545
  fh = file(file_name)
546
  try:
547
    data = fh.read()
548
  finally:
549
    fh.close()
550
  st = os.stat(file_name)
551
  params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
552
            st.st_atime, st.st_mtime]
553
  c = Client("upload_file", params)
554
  c.connect_list(node_list)
555
  c.run()
556
  return c.getresult()
557

    
558

    
559
def call_os_diagnose(node_list):
560
  """Request a diagnose of OS definitions.
561

562
  This is a multi-node call.
563

564
  """
565
  c = Client("os_diagnose", [])
566
  c.connect_list(node_list)
567
  c.run()
568
  result = c.getresult()
569
  new_result = {}
570
  for node_name in result:
571
    if result[node_name]:
572
      nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
573
    else:
574
      nr = []
575
    new_result[node_name] = nr
576
  return new_result
577

    
578

    
579
def call_os_get(node, name):
580
  """Returns an OS definition.
581

582
  This is a single-node call.
583

584
  """
585
  c = Client("os_get", [name])
586
  c.connect(node)
587
  c.run()
588
  result = c.getresult().get(node, False)
589
  if isinstance(result, dict):
590
    return objects.OS.FromDict(result)
591
  else:
592
    return result
593

    
594

    
595
def call_hooks_runner(node_list, hpath, phase, env):
596
  """Call the hooks runner.
597

598
  Args:
599
    - op: the OpCode instance
600
    - env: a dictionary with the environment
601

602
  This is a multi-node call.
603

604
  """
605
  params = [hpath, phase, env]
606
  c = Client("hooks_runner", params)
607
  c.connect_list(node_list)
608
  c.run()
609
  result = c.getresult()
610
  return result
611

    
612

    
613
def call_iallocator_runner(node, name, idata):
614
  """Call an iallocator on a remote node
615

616
  Args:
617
    - name: the iallocator name
618
    - input: the json-encoded input string
619

620
  This is a single-node call.
621

622
  """
623
  params = [name, idata]
624
  c = Client("iallocator_runner", params)
625
  c.connect(node)
626
  c.run()
627
  result = c.getresult().get(node, False)
628
  return result
629

    
630

    
631
def call_blockdev_grow(node, cf_bdev, amount):
632
  """Request a snapshot of the given block device.
633

634
  This is a single-node call.
635

636
  """
637
  c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
638
  c.connect(node)
639
  c.run()
640
  return c.getresult().get(node, False)
641

    
642

    
643
def call_blockdev_snapshot(node, cf_bdev):
644
  """Request a snapshot of the given block device.
645

646
  This is a single-node call.
647

648
  """
649
  c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
650
  c.connect(node)
651
  c.run()
652
  return c.getresult().get(node, False)
653

    
654

    
655
def call_snapshot_export(node, snap_bdev, dest_node, instance, cluster_name):
656
  """Request the export of a given snapshot.
657

658
  This is a single-node call.
659

660
  """
661
  params = [snap_bdev.ToDict(), dest_node, instance.ToDict(), cluster_name]
662
  c = Client("snapshot_export", params)
663
  c.connect(node)
664
  c.run()
665
  return c.getresult().get(node, False)
666

    
667

    
668
def call_finalize_export(node, instance, snap_disks):
669
  """Request the completion of an export operation.
670

671
  This writes the export config file, etc.
672

673
  This is a single-node call.
674

675
  """
676
  flat_disks = []
677
  for disk in snap_disks:
678
    flat_disks.append(disk.ToDict())
679
  params = [instance.ToDict(), flat_disks]
680
  c = Client("finalize_export", params)
681
  c.connect(node)
682
  c.run()
683
  return c.getresult().get(node, False)
684

    
685

    
686
def call_export_info(node, path):
687
  """Queries the export information in a given path.
688

689
  This is a single-node call.
690

691
  """
692
  c = Client("export_info", [path])
693
  c.connect(node)
694
  c.run()
695
  result = c.getresult().get(node, False)
696
  if not result:
697
    return result
698
  return objects.SerializableConfigParser.Loads(str(result))
699

    
700

    
701
def call_instance_os_import(node, inst, osdev, swapdev,
702
                            src_node, src_image, cluster_name):
703
  """Request the import of a backup into an instance.
704

705
  This is a single-node call.
706

707
  """
708
  params = [inst.ToDict(), osdev, swapdev, src_node, src_image, cluster_name]
709
  c = Client("instance_os_import", params)
710
  c.connect(node)
711
  c.run()
712
  return c.getresult().get(node, False)
713

    
714

    
715
def call_export_list(node_list):
716
  """Gets the stored exports list.
717

718
  This is a multi-node call.
719

720
  """
721
  c = Client("export_list", [])
722
  c.connect_list(node_list)
723
  c.run()
724
  result = c.getresult()
725
  return result
726

    
727

    
728
def call_export_remove(node, export):
729
  """Requests removal of a given export.
730

731
  This is a single-node call.
732

733
  """
734
  c = Client("export_remove", [export])
735
  c.connect(node)
736
  c.run()
737
  return c.getresult().get(node, False)
738

    
739

    
740
def call_node_leave_cluster(node):
741
  """Requests a node to clean the cluster information it has.
742

743
  This will remove the configuration information from the ganeti data
744
  dir.
745

746
  This is a single-node call.
747

748
  """
749
  c = Client("node_leave_cluster", [])
750
  c.connect(node)
751
  c.run()
752
  return c.getresult().get(node, False)
753

    
754

    
755
def call_node_volumes(node_list):
756
  """Gets all volumes on node(s).
757

758
  This is a multi-node call.
759

760
  """
761
  c = Client("node_volumes", [])
762
  c.connect_list(node_list)
763
  c.run()
764
  return c.getresult()
765

    
766

    
767
def call_test_delay(node_list, duration):
768
  """Sleep for a fixed time on given node(s).
769

770
  This is a multi-node call.
771

772
  """
773
  c = Client("test_delay", [duration])
774
  c.connect_list(node_list)
775
  c.run()
776
  return c.getresult()
777

    
778

    
779
def call_file_storage_dir_create(node, file_storage_dir):
780
  """Create the given file storage directory.
781

782
  This is a single-node call.
783

784
  """
785
  c = Client("file_storage_dir_create", [file_storage_dir])
786
  c.connect(node)
787
  c.run()
788
  return c.getresult().get(node, False)
789

    
790

    
791
def call_file_storage_dir_remove(node, file_storage_dir):
792
  """Remove the given file storage directory.
793

794
  This is a single-node call.
795

796
  """
797
  c = Client("file_storage_dir_remove", [file_storage_dir])
798
  c.connect(node)
799
  c.run()
800
  return c.getresult().get(node, False)
801

    
802

    
803
def call_file_storage_dir_rename(node, old_file_storage_dir,
804
                                 new_file_storage_dir):
805
  """Rename file storage directory.
806

807
  This is a single-node call.
808

809
  """
810
  c = Client("file_storage_dir_rename",
811
             [old_file_storage_dir, new_file_storage_dir])
812
  c.connect(node)
813
  c.run()
814
  return c.getresult().get(node, False)
815

    
816

    
817
def call_jobqueue_update(node_list, file_name, content):
818
  """Update job queue.
819

820
  This is a multi-node call.
821

822
  """
823
  c = Client("jobqueue_update", [file_name, content])
824
  c.connect_list(node_list)
825
  c.run()
826
  result = c.getresult()
827
  return result
828

    
829

    
830
def call_jobqueue_purge(node):
831
  """Purge job queue.
832

833
  This is a single-node call.
834

835
  """
836
  c = Client("jobqueue_purge", [])
837
  c.connect(node)
838
  c.run()
839
  return c.getresult().get(node, False)
840

    
841

    
842
def call_jobqueue_rename(node_list, old, new):
843
  """Rename a job queue file.
844

845
  This is a multi-node call.
846

847
  """
848
  c = Client("jobqueue_rename", [old, new])
849
  c.connect_list(node_list)
850
  c.run()
851
  result = c.getresult()
852
  return result