Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ b74159ee

History | View | Annotate | Download (16.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script to show add a new node to the cluster
23

24
"""
25

    
26
# pylint: disable-msg=C0103
27

    
28
import os
29
import socket
30
import httplib
31

    
32
import simplejson
33

    
34
from ganeti import logger
35
from ganeti import utils
36
from ganeti import errors
37
from ganeti import constants
38
from ganeti import objects
39
from ganeti import ssconf
40

    
41

    
42
class NodeController:
43
  """Node-handling class.
44

45
  For each node that we speak with, we create an instance of this
46
  class, so that we have a safe place to store the details of this
47
  individual call.
48

49
  """
50
  def __init__(self, parent, node):
51
    self.parent = parent
52
    self.node = node
53
    self.failed = False
54

    
55
    self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
56
    try:
57
      hc.connect()
58
      hc.putrequest('PUT', "/%s" % self.parent.procedure,
59
                    skip_accept_encoding=True)
60
      hc.putheader('Content-Length', str(len(parent.body)))
61
      hc.endheaders()
62
      hc.send(parent.body)
63
    except socket.error, err:
64
      logger.Error("Error connecting to %s: %s" % (node, str(err)))
65
      self.failed = True
66

    
67
  def get_response(self):
68
    """Try to process the response from the node.
69

70
    """
71
    if self.failed:
72
      # we already failed in connect
73
      return False
74
    resp = self.http_conn.getresponse()
75
    if resp.status != 200:
76
      return False
77
    try:
78
      length = int(resp.getheader('Content-Length', '0'))
79
    except ValueError:
80
      return False
81
    if not length:
82
      logger.Error("Zero-length reply from %s" % self.node)
83
      return False
84
    payload = resp.read(length)
85
    unload = simplejson.loads(payload)
86
    return unload
87

    
88

    
89
class Client:
90
  """RPC Client class.
91

92
  This class, given a (remote) method name, a list of parameters and a
93
  list of nodes, will contact (in parallel) all nodes, and return a
94
  dict of results (key: node name, value: result).
95

96
  One current bug is that generic failure is still signalled by
97
  'False' result, which is not good. This overloading of values can
98
  cause bugs.
99

100
  """
101
  result_set = False
102
  result = False
103
  allresult = []
104

    
105
  def __init__(self, procedure, args):
106
    ss = ssconf.SimpleStore()
107
    self.port = ss.GetNodeDaemonPort()
108
    self.nodepw = ss.GetNodeDaemonPassword()
109
    self.nc = {}
110
    self.results = {}
111
    self.procedure = procedure
112
    self.args = args
113
    self.body = simplejson.dumps(args)
114

    
115
  #--- generic connector -------------
116

    
117
  def connect_list(self, node_list):
118
    """Add a list of nodes to the target nodes.
119

120
    """
121
    for node in node_list:
122
      self.connect(node)
123

    
124
  def connect(self, connect_node):
125
    """Add a node to the target list.
126

127
    """
128
    self.nc[connect_node] = nc = NodeController(self, connect_node)
129

    
130
  def getresult(self):
131
    """Return the results of the call.
132

133
    """
134
    return self.results
135

    
136
  def run(self):
137
    """Wrapper over reactor.run().
138

139
    This function simply calls reactor.run() if we have any requests
140
    queued, otherwise it does nothing.
141

142
    """
143
    for node, nc in self.nc.items():
144
      self.results[node] = nc.get_response()
145

    
146

    
147
def call_volume_list(node_list, vg_name):
148
  """Gets the logical volumes present in a given volume group.
149

150
  This is a multi-node call.
151

152
  """
153
  c = Client("volume_list", [vg_name])
154
  c.connect_list(node_list)
155
  c.run()
156
  return c.getresult()
157

    
158

    
159
def call_vg_list(node_list):
160
  """Gets the volume group list.
161

162
  This is a multi-node call.
163

164
  """
165
  c = Client("vg_list", [])
166
  c.connect_list(node_list)
167
  c.run()
168
  return c.getresult()
169

    
170

    
171
def call_bridges_exist(node, bridges_list):
172
  """Checks if a node has all the bridges given.
173

174
  This method checks if all bridges given in the bridges_list are
175
  present on the remote node, so that an instance that uses interfaces
176
  on those bridges can be started.
177

178
  This is a single-node call.
179

180
  """
181
  c = Client("bridges_exist", [bridges_list])
182
  c.connect(node)
183
  c.run()
184
  return c.getresult().get(node, False)
185

    
186

    
187
def call_instance_start(node, instance, extra_args):
188
  """Starts an instance.
189

190
  This is a single-node call.
191

192
  """
193
  c = Client("instance_start", [instance.ToDict(), extra_args])
194
  c.connect(node)
195
  c.run()
196
  return c.getresult().get(node, False)
197

    
198

    
199
def call_instance_shutdown(node, instance):
200
  """Stops an instance.
201

202
  This is a single-node call.
203

204
  """
205
  c = Client("instance_shutdown", [instance.ToDict()])
206
  c.connect(node)
207
  c.run()
208
  return c.getresult().get(node, False)
209

    
210

    
211
def call_instance_reboot(node, instance, reboot_type, extra_args):
212
  """Reboots an instance.
213

214
  This is a single-node call.
215

216
  """
217
  c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
218
  c.connect(node)
219
  c.run()
220
  return c.getresult().get(node, False)
221

    
222

    
223
def call_instance_os_add(node, inst, osdev, swapdev):
224
  """Installs an OS on the given instance.
225

226
  This is a single-node call.
227

228
  """
229
  params = [inst.ToDict(), osdev, swapdev]
230
  c = Client("instance_os_add", params)
231
  c.connect(node)
232
  c.run()
233
  return c.getresult().get(node, False)
234

    
235

    
236
def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
237
  """Run the OS rename script for an instance.
238

239
  This is a single-node call.
240

241
  """
242
  params = [inst.ToDict(), old_name, osdev, swapdev]
243
  c = Client("instance_run_rename", params)
244
  c.connect(node)
245
  c.run()
246
  return c.getresult().get(node, False)
247

    
248

    
249
def call_instance_info(node, instance):
250
  """Returns information about a single instance.
251

252
  This is a single-node call.
253

254
  """
255
  c = Client("instance_info", [instance])
256
  c.connect(node)
257
  c.run()
258
  return c.getresult().get(node, False)
259

    
260

    
261
def call_all_instances_info(node_list):
262
  """Returns information about all instances on a given node.
263

264
  This is a single-node call.
265

266
  """
267
  c = Client("all_instances_info", [])
268
  c.connect_list(node_list)
269
  c.run()
270
  return c.getresult()
271

    
272

    
273
def call_instance_list(node_list):
274
  """Returns the list of running instances on a given node.
275

276
  This is a single-node call.
277

278
  """
279
  c = Client("instance_list", [])
280
  c.connect_list(node_list)
281
  c.run()
282
  return c.getresult()
283

    
284

    
285
def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
286
  """Do a TcpPing on the remote node
287

288
  This is a single-node call.
289
  """
290
  c = Client("node_tcp_ping", [source, target, port, timeout,
291
                               live_port_needed])
292
  c.connect(node)
293
  c.run()
294
  return c.getresult().get(node, False)
295

    
296

    
297
def call_node_info(node_list, vg_name):
298
  """Return node information.
299

300
  This will return memory information and volume group size and free
301
  space.
302

303
  This is a multi-node call.
304

305
  """
306
  c = Client("node_info", [vg_name])
307
  c.connect_list(node_list)
308
  c.run()
309
  retux = c.getresult()
310

    
311
  for node_name in retux:
312
    ret = retux.get(node_name, False)
313
    if type(ret) != dict:
314
      logger.Error("could not connect to node %s" % (node_name))
315
      ret = {}
316

    
317
    utils.CheckDict(ret,
318
                    { 'memory_total' : '-',
319
                      'memory_dom0' : '-',
320
                      'memory_free' : '-',
321
                      'vg_size' : 'node_unreachable',
322
                      'vg_free' : '-' },
323
                    "call_node_info",
324
                    )
325
  return retux
326

    
327

    
328
def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
329
  """Add a node to the cluster.
330

331
  This is a single-node call.
332

333
  """
334
  params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
335
  c = Client("node_add", params)
336
  c.connect(node)
337
  c.run()
338
  return c.getresult().get(node, False)
339

    
340

    
341
def call_node_verify(node_list, checkdict):
342
  """Request verification of given parameters.
343

344
  This is a multi-node call.
345

346
  """
347
  c = Client("node_verify", [checkdict])
348
  c.connect_list(node_list)
349
  c.run()
350
  return c.getresult()
351

    
352

    
353
def call_node_start_master(node):
354
  """Tells a node to activate itself as a master.
355

356
  This is a single-node call.
357

358
  """
359
  c = Client("node_start_master", [])
360
  c.connect(node)
361
  c.run()
362
  return c.getresult().get(node, False)
363

    
364

    
365
def call_node_stop_master(node):
366
  """Tells a node to demote itself from master status.
367

368
  This is a single-node call.
369

370
  """
371
  c = Client("node_stop_master", [])
372
  c.connect(node)
373
  c.run()
374
  return c.getresult().get(node, False)
375

    
376

    
377
def call_version(node_list):
378
  """Query node version.
379

380
  This is a multi-node call.
381

382
  """
383
  c = Client("version", [])
384
  c.connect_list(node_list)
385
  c.run()
386
  return c.getresult()
387

    
388

    
389
def call_blockdev_create(node, bdev, size, owner, on_primary, info):
390
  """Request creation of a given block device.
391

392
  This is a single-node call.
393

394
  """
395
  params = [bdev.ToDict(), size, owner, on_primary, info]
396
  c = Client("blockdev_create", params)
397
  c.connect(node)
398
  c.run()
399
  return c.getresult().get(node, False)
400

    
401

    
402
def call_blockdev_remove(node, bdev):
403
  """Request removal of a given block device.
404

405
  This is a single-node call.
406

407
  """
408
  c = Client("blockdev_remove", [bdev.ToDict()])
409
  c.connect(node)
410
  c.run()
411
  return c.getresult().get(node, False)
412

    
413

    
414
def call_blockdev_rename(node, devlist):
415
  """Request rename of the given block devices.
416

417
  This is a single-node call.
418

419
  """
420
  params = [(d.ToDict(), uid) for d, uid in devlist]
421
  c = Client("blockdev_rename", params)
422
  c.connect(node)
423
  c.run()
424
  return c.getresult().get(node, False)
425

    
426

    
427
def call_blockdev_assemble(node, disk, owner, on_primary):
428
  """Request assembling of a given block device.
429

430
  This is a single-node call.
431

432
  """
433
  params = [disk.ToDict(), owner, on_primary]
434
  c = Client("blockdev_assemble", params)
435
  c.connect(node)
436
  c.run()
437
  return c.getresult().get(node, False)
438

    
439

    
440
def call_blockdev_shutdown(node, disk):
441
  """Request shutdown of a given block device.
442

443
  This is a single-node call.
444

445
  """
446
  c = Client("blockdev_shutdown", [disk.ToDict()])
447
  c.connect(node)
448
  c.run()
449
  return c.getresult().get(node, False)
450

    
451

    
452
def call_blockdev_addchildren(node, bdev, ndevs):
453
  """Request adding a list of children to a (mirroring) device.
454

455
  This is a single-node call.
456

457
  """
458
  params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
459
  c = Client("blockdev_addchildren", params)
460
  c.connect(node)
461
  c.run()
462
  return c.getresult().get(node, False)
463

    
464

    
465
def call_blockdev_removechildren(node, bdev, ndevs):
466
  """Request removing a list of children from a (mirroring) device.
467

468
  This is a single-node call.
469

470
  """
471
  params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
472
  c = Client("blockdev_removechildren", params)
473
  c.connect(node)
474
  c.run()
475
  return c.getresult().get(node, False)
476

    
477

    
478
def call_blockdev_getmirrorstatus(node, disks):
479
  """Request status of a (mirroring) device.
480

481
  This is a single-node call.
482

483
  """
484
  params = [dsk.ToDict() for dsk in disks]
485
  c = Client("blockdev_getmirrorstatus", params)
486
  c.connect(node)
487
  c.run()
488
  return c.getresult().get(node, False)
489

    
490

    
491
def call_blockdev_find(node, disk):
492
  """Request identification of a given block device.
493

494
  This is a single-node call.
495

496
  """
497
  c = Client("blockdev_find", [disk.ToDict()])
498
  c.connect(node)
499
  c.run()
500
  return c.getresult().get(node, False)
501

    
502

    
503
def call_upload_file(node_list, file_name):
504
  """Upload a file.
505

506
  The node will refuse the operation in case the file is not on the
507
  approved file list.
508

509
  This is a multi-node call.
510

511
  """
512
  fh = file(file_name)
513
  try:
514
    data = fh.read()
515
  finally:
516
    fh.close()
517
  st = os.stat(file_name)
518
  params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
519
            st.st_atime, st.st_mtime]
520
  c = Client("upload_file", params)
521
  c.connect_list(node_list)
522
  c.run()
523
  return c.getresult()
524

    
525

    
526
def call_os_diagnose(node_list):
527
  """Request a diagnose of OS definitions.
528

529
  This is a multi-node call.
530

531
  """
532
  c = Client("os_diagnose", [])
533
  c.connect_list(node_list)
534
  c.run()
535
  result = c.getresult()
536
  new_result = {}
537
  for node_name in result:
538
    if result[node_name]:
539
      nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
540
    else:
541
      nr = []
542
    new_result[node_name] = nr
543
  return new_result
544

    
545

    
546
def call_os_get(node, name):
547
  """Returns an OS definition.
548

549
  This is a single-node call.
550

551
  """
552
  c = Client("os_get", [name])
553
  c.connect(node)
554
  c.run()
555
  result = c.getresult().get(node, False)
556
  if isinstance(result, dict):
557
    return objects.OS.FromDict(result)
558
  else:
559
    return result
560

    
561

    
562
def call_hooks_runner(node_list, hpath, phase, env):
563
  """Call the hooks runner.
564

565
  Args:
566
    - op: the OpCode instance
567
    - env: a dictionary with the environment
568

569
  This is a multi-node call.
570

571
  """
572
  params = [hpath, phase, env]
573
  c = Client("hooks_runner", params)
574
  c.connect_list(node_list)
575
  c.run()
576
  result = c.getresult()
577
  return result
578

    
579

    
580
def call_iallocator_runner(node, name, idata):
581
  """Call an iallocator on a remote node
582

583
  Args:
584
    - name: the iallocator name
585
    - input: the json-encoded input string
586

587
  This is a single-node call.
588

589
  """
590
  params = [name, idata]
591
  c = Client("iallocator_runner", params)
592
  c.connect(node)
593
  c.run()
594
  result = c.getresult().get(node, False)
595
  return result
596

    
597

    
598
def call_blockdev_snapshot(node, cf_bdev):
599
  """Request a snapshot of the given block device.
600

601
  This is a single-node call.
602

603
  """
604
  c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
605
  c.connect(node)
606
  c.run()
607
  return c.getresult().get(node, False)
608

    
609

    
610
def call_snapshot_export(node, snap_bdev, dest_node, instance):
611
  """Request the export of a given snapshot.
612

613
  This is a single-node call.
614

615
  """
616
  params = [snap_bdev.ToDict(), dest_node, instance.ToDict()]
617
  c = Client("snapshot_export", params)
618
  c.connect(node)
619
  c.run()
620
  return c.getresult().get(node, False)
621

    
622

    
623
def call_finalize_export(node, instance, snap_disks):
624
  """Request the completion of an export operation.
625

626
  This writes the export config file, etc.
627

628
  This is a single-node call.
629

630
  """
631
  flat_disks = []
632
  for disk in snap_disks:
633
    flat_disks.append(disk.ToDict())
634
  params = [instance.ToDict(), flat_disks]
635
  c = Client("finalize_export", params)
636
  c.connect(node)
637
  c.run()
638
  return c.getresult().get(node, False)
639

    
640

    
641
def call_export_info(node, path):
642
  """Queries the export information in a given path.
643

644
  This is a single-node call.
645

646
  """
647
  c = Client("export_info", [path])
648
  c.connect(node)
649
  c.run()
650
  result = c.getresult().get(node, False)
651
  if not result:
652
    return result
653
  return objects.SerializableConfigParser.Loads(str(result))
654

    
655

    
656
def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
657
  """Request the import of a backup into an instance.
658

659
  This is a single-node call.
660

661
  """
662
  params = [inst.ToDict(), osdev, swapdev, src_node, src_image]
663
  c = Client("instance_os_import", params)
664
  c.connect(node)
665
  c.run()
666
  return c.getresult().get(node, False)
667

    
668

    
669
def call_export_list(node_list):
670
  """Gets the stored exports list.
671

672
  This is a multi-node call.
673

674
  """
675
  c = Client("export_list", [])
676
  c.connect_list(node_list)
677
  c.run()
678
  result = c.getresult()
679
  return result
680

    
681

    
682
def call_export_remove(node, export):
683
  """Requests removal of a given export.
684

685
  This is a single-node call.
686

687
  """
688
  c = Client("export_remove", [export])
689
  c.connect(node)
690
  c.run()
691
  return c.getresult().get(node, False)
692

    
693

    
694
def call_node_leave_cluster(node):
695
  """Requests a node to clean the cluster information it has.
696

697
  This will remove the configuration information from the ganeti data
698
  dir.
699

700
  This is a single-node call.
701

702
  """
703
  c = Client("node_leave_cluster", [])
704
  c.connect(node)
705
  c.run()
706
  return c.getresult().get(node, False)
707

    
708

    
709
def call_node_volumes(node_list):
710
  """Gets all volumes on node(s).
711

712
  This is a multi-node call.
713

714
  """
715
  c = Client("node_volumes", [])
716
  c.connect_list(node_list)
717
  c.run()
718
  return c.getresult()
719

    
720

    
721
def call_test_delay(node_list, duration):
722
  """Sleep for a fixed time on given node(s).
723

724
  This is a multi-node call.
725

726
  """
727
  c = Client("test_delay", [duration])
728
  c.connect_list(node_list)
729
  c.run()
730
  return c.getresult()
731

    
732

    
733
def call_file_storage_dir_create(node, file_storage_dir):
734
  """Create the given file storage directory.
735

736
  This is a single-node call.
737

738
  """
739
  c = Client("file_storage_dir_create", [file_storage_dir])
740
  c.connect(node)
741
  c.run()
742
  return c.getresult().get(node, False)
743

    
744

    
745
def call_file_storage_dir_remove(node, file_storage_dir):
746
  """Remove the given file storage directory.
747

748
  This is a single-node call.
749

750
  """
751
  c = Client("file_storage_dir_remove", [file_storage_dir])
752
  c.connect(node)
753
  c.run()
754
  return c.getresult().get(node, False)
755

    
756

    
757
def call_file_storage_dir_rename(node, old_file_storage_dir,
758
                                 new_file_storage_dir):
759
  """Rename file storage directory.
760

761
  This is a single-node call.
762

763
  """
764
  c = Client("file_storage_dir_rename",
765
             [old_file_storage_dir, new_file_storage_dir])
766
  c.connect(node)
767
  c.run()
768
  return c.getresult().get(node, False)
769