Statistics
| Branch: | Tag: | Revision:

root / lib / rapi / client.py @ 95ab4de9

History | View | Annotate | Download (23.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti RAPI client."""
23

    
24
import httplib
25
import httplib2
26
import simplejson
27
import socket
28
import urllib
29
from OpenSSL import SSL
30
from OpenSSL import crypto
31

    
32

    
33
HTTP_DELETE = "DELETE"
34
HTTP_GET = "GET"
35
HTTP_PUT = "PUT"
36
HTTP_POST = "POST"
37
REPLACE_DISK_PRI = "replace_on_primary"
38
REPLACE_DISK_SECONDARY = "replace_on_secondary"
39
REPLACE_DISK_CHG = "replace_new_secondary"
40
REPLACE_DISK_AUTO = "replace_auto"
41
VALID_REPLACEMENT_MODES = frozenset([
42
    REPLACE_DISK_PRI, REPLACE_DISK_SECONDARY, REPLACE_DISK_CHG,
43
    REPLACE_DISK_AUTO
44
    ])
45
VALID_NODE_ROLES = frozenset([
46
    "drained", "master", "master-candidate", "offline", "regular"
47
    ])
48
VALID_STORAGE_TYPES = frozenset(["file", "lvm-pv", "lvm-vg"])
49

    
50

    
51
class Error(Exception):
52
  """Base error class for this module.
53

54
  """
55
  pass
56

    
57

    
58
class CertificateError(Error):
59
  """Raised when a problem is found with the SSL certificate.
60

61
  """
62
  pass
63

    
64

    
65
class GanetiApiError(Error):
66
  """Generic error raised from Ganeti API.
67

68
  """
69
  pass
70

    
71

    
72
class InvalidReplacementMode(Error):
73
  """Raised when an invalid disk replacement mode is attempted.
74

75
  """
76
  pass
77

    
78

    
79
class InvalidStorageType(Error):
80
  """Raised when an invalid storage type is used.
81

82
  """
83
  pass
84

    
85

    
86
class InvalidNodeRole(Error):
87
  """Raised when an invalid node role is used.
88

89
  """
90
  pass
91

    
92

    
93
class GanetiRapiClient(object):
94
  """Ganeti RAPI client.
95

96
  """
97

    
98
  USER_AGENT = "Ganeti RAPI Client"
99

    
100
  def __init__(self, master_hostname, port=5080, username=None, password=None,
101
               ssl_cert=None):
102
    """Constructor.
103

104
    @type master_hostname: str
105
    @param master_hostname: the ganeti cluster master to interact with
106
    @type port: int
107
    @param port: the port on which the RAPI is running. (default is 5080)
108
    @type username: str
109
    @param username: the username to connect with
110
    @type password: str
111
    @param password: the password to connect with
112
    @type ssl_cert: str or None
113
    @param ssl_cert: the expected SSL certificate. if None, SSL certificate
114
        will not be verified
115

116
    """
117
    self._master_hostname = master_hostname
118
    self._port = port
119
    if ssl_cert:
120
      _VerifyCertificate(self._master_hostname, self._port, ssl_cert)
121

    
122
    self._http = httplib2.Http()
123
    self._headers = {
124
        "Accept": "text/plain",
125
        "Content-type": "application/x-www-form-urlencoded",
126
        "User-Agent": self.USER_AGENT}
127
    self._version = None
128
    if username and password:
129
      self._http.add_credentials(username, password)
130

    
131
  def _MakeUrl(self, path, query=None, prepend_version=True):
132
    """Constructs the URL to pass to the HTTP client.
133

134
    @type path: str
135
    @param path: HTTP URL path
136
    @type query: list of two-tuples
137
    @param query: query arguments to pass to urllib.urlencode
138
    @type prepend_version: bool
139
    @param prepend_version: whether to automatically fetch and prepend the
140
        Ganeti version to the URL path
141

142
    @rtype:  str
143
    @return: URL path
144

145
    """
146
    if prepend_version:
147
      if not self._version:
148
        self._GetVersionInternal()
149
      path = "/%d%s" % (self._version, path)
150

    
151
    return "https://%(host)s:%(port)d%(path)s?%(query)s" % {
152
        "host": self._master_hostname,
153
        "port": self._port,
154
        "path": path,
155
        "query": urllib.urlencode(query or [])}
156

    
157
  def _SendRequest(self, method, path, query=None, content=None,
158
                   prepend_version=True):
159
    """Sends an HTTP request.
160

161
    This constructs a full URL, encodes and decodes HTTP bodies, and
162
    handles invalid responses in a pythonic way.
163

164
    @type method: str
165
    @param method: HTTP method to use
166
    @type path: str
167
    @param path: HTTP URL path
168
    @type query: list of two-tuples
169
    @param query: query arguments to pass to urllib.urlencode
170
    @type content: str or None
171
    @param content: HTTP body content
172
    @type prepend_version: bool
173
    @param prepend_version: whether to automatically fetch and prepend the
174
        Ganeti version to the URL path
175

176
    @rtype: str
177
    @return: JSON-Decoded response
178

179
    @raises GanetiApiError: If an invalid response is returned
180

181
    """
182
    if content:
183
      simplejson.JSONEncoder(sort_keys=True).encode(content)
184

    
185
    url = self._MakeUrl(path, query, prepend_version)
186
    resp_headers, resp_content = self._http.request(
187
        url, method, body=content, headers=self._headers)
188

    
189
    if resp_content:
190
      resp_content = simplejson.loads(resp_content)
191

    
192
    # TODO: Are there other status codes that are valid? (redirect?)
193
    if resp_headers.status != 200:
194
      if isinstance(resp_content, dict):
195
        msg = ("%s %s: %s" %
196
            (resp_content["code"], resp_content["message"],
197
             resp_content["explain"]))
198
      else:
199
        msg = resp_content
200
      raise GanetiApiError(msg)
201

    
202
    return resp_content
203

    
204
  def _GetVersionInternal(self):
205
    """Gets the Remote API version running on the cluster.
206

207
    @rtype: int
208
    @return: Ganeti version
209

210
    """
211
    self._version = self._SendRequest(HTTP_GET, "/version",
212
                                      prepend_version=False)
213
    return self._version
214

    
215
  def GetVersion(self):
216
    """Gets the ganeti version running on the cluster.
217

218
    @rtype: int
219
    @return: Ganeti version
220

221
    """
222
    if not self._version:
223
      self._GetVersionInternal()
224
    return self._version
225

    
226
  def GetOperatingSystems(self):
227
    """Gets the Operating Systems running in the Ganeti cluster.
228

229
    @rtype: list of str
230
    @return: operating systems
231

232
    """
233
    return self._SendRequest(HTTP_GET, "/os")
234

    
235
  def GetInfo(self):
236
    """Gets info about the cluster.
237

238
    @rtype: dict
239
    @return: information about the cluster
240

241
    """
242
    return self._SendRequest(HTTP_GET, "/info")
243

    
244
  def GetClusterTags(self):
245
    """Gets the cluster tags.
246

247
    @rtype: list of str
248
    @return: cluster tags
249

250
    """
251
    return self._SendRequest(HTTP_GET, "/tags")
252

    
253
  def AddClusterTags(self, tags, dry_run=False):
254
    """Adds tags to the cluster.
255

256
    @type tags: list of str
257
    @param tags: tags to add to the cluster
258
    @type dry_run: bool
259
    @param dry_run: whether to perform a dry run
260

261
    @rtype: int
262
    @return: job id
263

264
    """
265
    query = [("tag", t) for t in tags]
266
    if dry_run:
267
      query.append(("dry-run", 1))
268

    
269
    self._SendRequest(HTTP_PUT, "/tags", query)
270

    
271
  def DeleteClusterTags(self, tags, dry_run=False):
272
    """Deletes tags from the cluster.
273

274
    @type tags: list of str
275
    @param tags: tags to delete
276
    @type dry_run: bool
277
    @param dry_run: whether to perform a dry run
278

279
    """
280
    query = [("tag", t) for t in tags]
281
    if dry_run:
282
      query.append(("dry-run", 1))
283

    
284
    self._SendRequest(HTTP_DELETE, "/tags", query)
285

    
286
  def GetInstances(self, bulk=False):
287
    """Gets information about instances on the cluster.
288

289
    @type bulk: bool
290
    @param bulk: whether to return all information about all instances
291

292
    @rtype: list of dict or list of str
293
    @return: if bulk is True, info about the instances, else a list of instances
294

295
    """
296
    query = []
297
    if bulk:
298
      query.append(("bulk", 1))
299

    
300
    instances = self._SendRequest(HTTP_GET, "/instances", query)
301
    if bulk:
302
      return instances
303
    else:
304
      return [i["id"] for i in instances]
305

    
306

    
307
  def GetInstanceInfo(self, instance):
308
    """Gets information about an instance.
309

310
    @type instance: str
311
    @param instance: instance whose info to return
312

313
    @rtype: dict
314
    @return: info about the instance
315

316
    """
317
    return self._SendRequest(HTTP_GET, "/instances/%s" % instance)
318

    
319
  def CreateInstance(self, dry_run=False):
320
    """Creates a new instance.
321

322
    @type dry_run: bool
323
    @param dry_run: whether to perform a dry run
324

325
    @rtype: int
326
    @return: job id
327

328
    """
329
    # TODO: Pass arguments needed to actually create an instance.
330
    query = []
331
    if dry_run:
332
      query.append(("dry-run", 1))
333

    
334
    return self._SendRequest(HTTP_POST, "/instances", query)
335

    
336
  def DeleteInstance(self, instance, dry_run=False):
337
    """Deletes an instance.
338

339
    @type instance: str
340
    @param instance: the instance to delete
341

342
    """
343
    query = []
344
    if dry_run:
345
      query.append(("dry-run", 1))
346

    
347
    self._SendRequest(HTTP_DELETE, "/instances/%s" % instance, query)
348

    
349
  def GetInstanceTags(self, instance):
350
    """Gets tags for an instance.
351

352
    @type instance: str
353
    @param instance: instance whose tags to return
354

355
    @rtype: list of str
356
    @return: tags for the instance
357

358
    """
359
    return self._SendRequest(HTTP_GET, "/instances/%s/tags" % instance)
360

    
361
  def AddInstanceTags(self, instance, tags, dry_run=False):
362
    """Adds tags to an instance.
363

364
    @type instance: str
365
    @param instance: instance to add tags to
366
    @type tags: list of str
367
    @param tags: tags to add to the instance
368
    @type dry_run: bool
369
    @param dry_run: whether to perform a dry run
370

371
    @rtype: int
372
    @return: job id
373

374
    """
375
    query = [("tag", t) for t in tags]
376
    if dry_run:
377
      query.append(("dry-run", 1))
378

    
379
    self._SendRequest(HTTP_PUT, "/instances/%s/tags" % instance, query)
380

    
381
  def DeleteInstanceTags(self, instance, tags, dry_run=False):
382
    """Deletes tags from an instance.
383

384
    @type instance: str
385
    @param instance: instance to delete tags from
386
    @type tags: list of str
387
    @param tags: tags to delete
388
    @type dry_run: bool
389
    @param dry_run: whether to perform a dry run
390

391
    """
392
    query = [("tag", t) for t in tags]
393
    if dry_run:
394
      query.append(("dry-run", 1))
395

    
396
    self._SendRequest(HTTP_DELETE, "/instances/%s/tags" % instance, query)
397

    
398
  def RebootInstance(self, instance, reboot_type=None, ignore_secondaries=None,
399
                     dry_run=False):
400
    """Reboots an instance.
401

402
    @type instance: str
403
    @param instance: instance to rebot
404
    @type reboot_type: str
405
    @param reboot_type: one of: hard, soft, full
406
    @type ignore_secondaries: bool
407
    @param ignore_secondaries: if True, ignores errors for the secondary node
408
        while re-assembling disks (in hard-reboot mode only)
409
    @type dry_run: bool
410
    @param dry_run: whether to perform a dry run
411

412
    """
413
    query = []
414
    if reboot_type:
415
      query.append(("type", reboot_type))
416
    if ignore_secondaries is not None:
417
      query.append(("ignore_secondaries", ignore_secondaries))
418
    if dry_run:
419
      query.append(("dry-run", 1))
420

    
421
    self._SendRequest(HTTP_POST, "/instances/%s/reboot" % instance, query)
422

    
423
  def ShutdownInstance(self, instance, dry_run=False):
424
    """Shuts down an instance.
425

426
    @type instance: str
427
    @param instance: the instance to shut down
428
    @type dry_run: bool
429
    @param dry_run: whether to perform a dry run
430

431
    """
432
    query = []
433
    if dry_run:
434
      query.append(("dry-run", 1))
435

    
436
    self._SendRequest(HTTP_PUT, "/instances/%s/shutdown" % instance, query)
437

    
438
  def StartupInstance(self, instance, dry_run=False):
439
    """Starts up an instance.
440

441
    @type instance: str
442
    @param instance: the instance to start up
443
    @type dry_run: bool
444
    @param dry_run: whether to perform a dry run
445

446
    """
447
    query = []
448
    if dry_run:
449
      query.append(("dry-run", 1))
450

    
451
    self._SendRequest(HTTP_PUT, "/instances/%s/startup" % instance, query)
452

    
453
  def ReinstallInstance(self, instance, os, no_startup=False):
454
    """Reinstalls an instance.
455

456
    @type instance: str
457
    @param instance: the instance to reinstall
458
    @type os: str
459
    @param os: the os to reinstall
460
    @type no_startup: bool
461
    @param no_startup: whether to start the instance automatically
462

463
    """
464
    query = [("os", os)]
465
    if no_startup:
466
      query.append(("nostartup", 1))
467
    self._SendRequest(HTTP_POST, "/instances/%s/reinstall" % instance, query)
468

    
469
  def ReplaceInstanceDisks(self, instance, disks, mode="replace_auto",
470
                           remote_node=None, iallocator="hail", dry_run=False):
471
    """Replaces disks on an instance.
472

473
    @type instance: str
474
    @param instance: instance whose disks to replace
475
    @type disks: list of str
476
    @param disks: disks to replace
477
    @type mode: str
478
    @param mode: replacement mode to use. defaults to replace_auto
479
    @type remote_node: str or None
480
    @param remote_node: new secondary node to use (for use with
481
        replace_new_secondary mdoe)
482
    @type iallocator: str or None
483
    @param iallocator: instance allocator plugin to use (for use with
484
        replace_auto mdoe).  default is hail
485
    @type dry_run: bool
486
    @param dry_run: whether to perform a dry run
487

488
    @rtype: int
489
    @return: job id
490

491
    @raises InvalidReplacementMode: If an invalid disk replacement mode is given
492
    @raises GanetiApiError: If no secondary node is given with a non-auto
493
        replacement mode is requested.
494

495
    """
496
    if mode not in VALID_REPLACEMENT_MODES:
497
      raise InvalidReplacementMode("%s is not a valid disk replacement mode.",
498
                                   mode)
499

    
500
    query = [("mode", mode), ("disks", ",".join(disks))]
501

    
502
    if mode is REPLACE_DISK_AUTO:
503
      query.append(("iallocator", iallocator))
504
    elif mode is REPLACE_DISK_SECONDARY:
505
      if remote_node is None:
506
        raise GanetiApiError("You must supply a new secondary node.")
507
      query.append(("remote_node", remote_node))
508

    
509
    if dry_run:
510
      query.append(("dry-run", 1))
511

    
512
    return self._SendRequest(HTTP_POST,
513
                             "/instances/%s/replace-disks" % instance, query)
514

    
515
  def GetJobs(self):
516
    """Gets all jobs for the cluster.
517

518
    @rtype: list of int
519
    @return: job ids for the cluster
520

521
    """
522
    return [int(j["id"]) for j in self._SendRequest(HTTP_GET, "/jobs")]
523

    
524
  def GetJobStatus(self, job_id):
525
    """Gets the status of a job.
526

527
    @type job_id: int
528
    @param job_id: job id whose status to query
529

530
    @rtype: dict
531
    @return: job status
532

533
    """
534
    return self._SendRequest(HTTP_GET, "/jobs/%d" % job_id)
535

    
536
  def DeleteJob(self, job_id, dry_run=False):
537
    """Deletes a job.
538

539
    @type job_id: int
540
    @param job_id: id of the job to delete
541
    @type dry_run: bool
542
    @param dry_run: whether to perform a dry run
543

544
    """
545
    query = []
546
    if dry_run:
547
      query.append(("dry-run", 1))
548

    
549
    self._SendRequest(HTTP_DELETE, "/jobs/%d" % job_id, query)
550

    
551
  def GetNodes(self, bulk=False):
552
    """Gets all nodes in the cluster.
553

554
    @type bulk: bool
555
    @param bulk: whether to return all information about all instances
556

557
    @rtype: list of dict or str
558
    @return: if bulk is true, info about nodes in the cluster,
559
        else list of nodes in the cluster
560

561
    """
562
    query = []
563
    if bulk:
564
      query.append(("bulk", 1))
565

    
566
    nodes = self._SendRequest(HTTP_GET, "/nodes", query)
567
    if bulk:
568
      return nodes
569
    else:
570
      return [n["id"] for n in nodes]
571

    
572
  def GetNodeInfo(self, node):
573
    """Gets information about a node.
574

575
    @type node: str
576
    @param node: node whose info to return
577

578
    @rtype: dict
579
    @return: info about the node
580

581
    """
582
    return self._SendRequest(HTTP_GET, "/nodes/%s" % node)
583

    
584
  def EvacuateNode(self, node, iallocator=None, remote_node=None,
585
                   dry_run=False):
586
    """Evacuates instances from a Ganeti node.
587

588
    @type node: str
589
    @param node: node to evacuate
590
    @type iallocator: str or None
591
    @param iallocator: instance allocator to use
592
    @type remote_node: str
593
    @param remote_node: node to evaucate to
594
    @type dry_run: bool
595
    @param dry_run: whether to perform a dry run
596

597
    @rtype: int
598
    @return: job id
599

600
    @raises GanetiApiError: if an iallocator and remote_node are both specified
601

602
    """
603
    query = []
604
    if iallocator and remote_node:
605
      raise GanetiApiError("Only one of iallocator or remote_node can be used.")
606

    
607
    if iallocator:
608
      query.append(("iallocator", iallocator))
609
    if remote_node:
610
      query.append(("remote_node", remote_node))
611
    if dry_run:
612
      query.append(("dry-run", 1))
613

    
614
    return self._SendRequest(HTTP_POST, "/nodes/%s/evacuate" % node, query)
615

    
616
  def MigrateNode(self, node, live=True, dry_run=False):
617
    """Migrates all primary instances from a node.
618

619
    @type node: str
620
    @param node: node to migrate
621
    @type live: bool
622
    @param live: whether to use live migration
623
    @type dry_run: bool
624
    @param dry_run: whether to perform a dry run
625

626
    @rtype: int
627
    @return: job id
628

629
    """
630
    query = []
631
    if live:
632
      query.append(("live", 1))
633
    if dry_run:
634
      query.append(("dry-run", 1))
635

    
636
    return self._SendRequest(HTTP_POST, "/nodes/%s/migrate" % node, query)
637

    
638
  def GetNodeRole(self, node):
639
    """Gets the current role for a node.
640

641
    @type node: str
642
    @param node: node whose role to return
643

644
    @rtype: str
645
    @return: the current role for a node
646

647
    """
648
    return self._SendRequest(HTTP_GET, "/nodes/%s/role" % node)
649

    
650
  def SetNodeRole(self, node, role, force=False):
651
    """Sets the role for a node.
652

653
    @type node: str
654
    @param node: the node whose role to set
655
    @type role: str
656
    @param role: the role to set for the node
657
    @type force: bool
658
    @param force: whether to force the role change
659

660
    @rtype: int
661
    @return: job id
662

663
    @raise InvalidNodeRole: If an invalid node role is specified
664

665
    """
666
    if role not in VALID_NODE_ROLES:
667
      raise InvalidNodeRole("%s is not a valid node role.", role)
668

    
669
    query = [("force", force)]
670
    return self._SendRequest(HTTP_PUT, "/nodes/%s/role" % node, query,
671
                             content=role)
672

    
673
  def GetNodeStorageUnits(self, node, storage_type, output_fields):
674
    """Gets the storage units for a node.
675

676
    @type node: str
677
    @param node: the node whose storage units to return
678
    @type storage_type: str
679
    @param storage_type: storage type whose units to return
680
    @type output_fields: str
681
    @param output_fields: storage type fields to return
682

683
    @rtype: int
684
    @return: job id where results can be retrieved
685

686
    @raise InvalidStorageType: If an invalid storage type is specified
687

688
    """
689
    # TODO: Add default for storage_type & output_fields
690
    if storage_type not in VALID_STORAGE_TYPES:
691
      raise InvalidStorageType("%s is an invalid storage type.", storage_type)
692

    
693
    query = [("storage_type", storage_type), ("output_fields", output_fields)]
694
    return self._SendRequest(HTTP_GET, "/nodes/%s/storage" % node, query)
695

    
696
  def ModifyNodeStorageUnits(self, node, storage_type, name, allocatable=True):
697
    """Modifies parameters of storage units on the node.
698

699
    @type node: str
700
    @param node: node whose storage units to modify
701
    @type storage_type: str
702
    @param storage_type: storage type whose units to modify
703
    @type name: str
704
    @param name: name of the storage unit
705
    @type allocatable: bool
706
    @param allocatable: TODO: Document me
707

708
    @rtype: int
709
    @return: job id
710

711
    @raise InvalidStorageType: If an invalid storage type is specified
712

713
    """
714
    if storage_type not in VALID_STORAGE_TYPES:
715
      raise InvalidStorageType("%s is an invalid storage type.", storage_type)
716

    
717
    query = [
718
        ("storage_type", storage_type), ("name", name),
719
        ("allocatable", allocatable)
720
        ]
721
    return self._SendRequest(HTTP_PUT, "/nodes/%s/storage/modify" % node, query)
722

    
723
  def RepairNodeStorageUnits(self, node, storage_type, name):
724
    """Repairs a storage unit on the node.
725

726
    @type node: str
727
    @param node: node whose storage units to repair
728
    @type storage_type: str
729
    @param storage_type: storage type to repair
730
    @type name: str
731
    @param name: name of the storage unit to repair
732

733
    @rtype: int
734
    @return: job id
735

736
    @raise InvalidStorageType: If an invalid storage type is specified
737

738
    """
739
    if storage_type not in VALID_STORAGE_TYPES:
740
      raise InvalidStorageType("%s is an invalid storage type.", storage_type)
741

    
742
    query = [("storage_type", storage_type), ("name", name)]
743
    return self._SendRequest(HTTP_PUT, "/nodes/%s/storage/repair" % node, query)
744

    
745
  def GetNodeTags(self, node):
746
    """Gets the tags for a node.
747

748
    @type node: str
749
    @param node: node whose tags to return
750

751
    @rtype: list of str
752
    @return: tags for the node
753

754
    """
755
    return self._SendRequest(HTTP_GET, "/nodes/%s/tags" % node)
756

    
757
  def AddNodeTags(self, node, tags, dry_run=False):
758
    """Adds tags to a node.
759

760
    @type node: str
761
    @param node: node to add tags to
762
    @type tags: list of str
763
    @param tags: tags to add to the node
764
    @type dry_run: bool
765
    @param dry_run: whether to perform a dry run
766

767
    @rtype: int
768
    @return: job id
769

770
    """
771
    query = [("tag", t) for t in tags]
772
    if dry_run:
773
      query.append(("dry-run", 1))
774

    
775
    return self._SendRequest(HTTP_PUT, "/nodes/%s/tags" % node, query,
776
                             content=tags)
777

    
778
  def DeleteNodeTags(self, node, tags, dry_run=False):
779
    """Delete tags from a node.
780

781
    @type node: str
782
    @param node: node to remove tags from
783
    @type tags: list of str
784
    @param tags: tags to remove from the node
785
    @type dry_run: bool
786
    @param dry_run: whether to perform a dry run
787

788
    @rtype: int
789
    @return: job id
790

791
    """
792
    query = [("tag", t) for t in tags]
793
    if dry_run:
794
      query.append(("dry-run", 1))
795

    
796
    return self._SendRequest(HTTP_DELETE, "/nodes/%s/tags" % node, query)
797

    
798

    
799
class HTTPSConnectionOpenSSL(httplib.HTTPSConnection):
800
  """HTTPS Connection handler that verifies the SSL certificate.
801

802
  """
803

    
804
  # pylint: disable-msg=W0142
805
  def __init__(self, *args, **kwargs):
806
    """Constructor.
807

808
    """
809
    httplib.HTTPSConnection.__init__(self, *args, **kwargs)
810

    
811
    self._ssl_cert = None
812
    if self.cert_file:
813
      f = open(self.cert_file, "r")
814
      self._ssl_cert = crypto.load_certificate(crypto.FILETYPE_PEM, f.read())
815
      f.close()
816

    
817
  # pylint: disable-msg=W0613
818
  def _VerifySSLCertCallback(self, conn, cert, errnum, errdepth, ok):
819
    """Verifies the SSL certificate provided by the peer.
820

821
    """
822
    return (self._ssl_cert.digest("sha1") == cert.digest("sha1") and
823
            self._ssl_cert.digest("md5") == cert.digest("md5"))
824

    
825
  def connect(self):
826
    """Connect to the server specified when the object was created.
827

828
    This ensures that SSL certificates are verified.
829

830
    """
831
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
832
    ctx = SSL.Context(SSL.SSLv23_METHOD)
833
    ctx.set_options(SSL.OP_NO_SSLv2)
834
    ctx.use_certificate(self._ssl_cert)
835
    ctx.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
836
                   self._VerifySSLCertCallback)
837

    
838
    ssl = SSL.Connection(ctx, sock)
839
    ssl.connect((self.host, self.port))
840
    self.sock = httplib.FakeSocket(sock, ssl)
841

    
842

    
843
def _VerifyCertificate(hostname, port, cert_file):
844
  """Verifies the SSL certificate for the given host/port.
845

846
  @type hostname: str
847
  @param hostname: the ganeti cluster master whose certificate to verify
848
  @type port: int
849
  @param port: the port on which the RAPI is running
850
  @type cert_file: str
851
  @param cert_file: filename of the expected SSL certificate
852

853
  @raises CertificateError: If an invalid SSL certificate is found
854

855
  """
856
  https = HTTPSConnectionOpenSSL(hostname, port, cert_file=cert_file)
857
  try:
858
    try:
859
      https.request(HTTP_GET, "/version")
860
    except (crypto.Error, SSL.Error):
861
      raise CertificateError("Invalid SSL certificate.")
862
  finally:
863
    https.close()