Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 7f97eb93

History | View | Annotate | Download (17 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012, 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36
import logging
37

    
38
from ganeti import compat
39
from ganeti import serializer
40
from ganeti import constants
41
from ganeti import errors
42
from ganeti import utils
43
from ganeti import objects
44
from ganeti import pathutils
45

    
46

    
47
KEY_METHOD = "method"
48
KEY_ARGS = "args"
49
KEY_SUCCESS = "success"
50
KEY_RESULT = "result"
51
KEY_VERSION = "version"
52

    
53
REQ_SUBMIT_JOB = "SubmitJob"
54
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
55
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
56
REQ_CANCEL_JOB = "CancelJob"
57
REQ_ARCHIVE_JOB = "ArchiveJob"
58
REQ_CHANGE_JOB_PRIORITY = "ChangeJobPriority"
59
REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs"
60
REQ_QUERY = "Query"
61
REQ_QUERY_FIELDS = "QueryFields"
62
REQ_QUERY_JOBS = "QueryJobs"
63
REQ_QUERY_INSTANCES = "QueryInstances"
64
REQ_QUERY_NODES = "QueryNodes"
65
REQ_QUERY_GROUPS = "QueryGroups"
66
REQ_QUERY_NETWORKS = "QueryNetworks"
67
REQ_QUERY_EXPORTS = "QueryExports"
68
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
69
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
70
REQ_QUERY_TAGS = "QueryTags"
71
REQ_SET_DRAIN_FLAG = "SetDrainFlag"
72
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
73

    
74
#: List of all LUXI requests
75
REQ_ALL = compat.UniqueFrozenset([
76
  REQ_ARCHIVE_JOB,
77
  REQ_AUTO_ARCHIVE_JOBS,
78
  REQ_CANCEL_JOB,
79
  REQ_CHANGE_JOB_PRIORITY,
80
  REQ_QUERY,
81
  REQ_QUERY_CLUSTER_INFO,
82
  REQ_QUERY_CONFIG_VALUES,
83
  REQ_QUERY_EXPORTS,
84
  REQ_QUERY_FIELDS,
85
  REQ_QUERY_GROUPS,
86
  REQ_QUERY_INSTANCES,
87
  REQ_QUERY_JOBS,
88
  REQ_QUERY_NODES,
89
  REQ_QUERY_NETWORKS,
90
  REQ_QUERY_TAGS,
91
  REQ_SET_DRAIN_FLAG,
92
  REQ_SET_WATCHER_PAUSE,
93
  REQ_SUBMIT_JOB,
94
  REQ_SUBMIT_MANY_JOBS,
95
  REQ_WAIT_FOR_JOB_CHANGE,
96
  ])
97

    
98
DEF_CTMO = 10
99
DEF_RWTO = 60
100

    
101
# WaitForJobChange timeout
102
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
103

    
104

    
105
class ProtocolError(errors.LuxiError):
106
  """Denotes an error in the LUXI protocol."""
107

    
108

    
109
class ConnectionClosedError(ProtocolError):
110
  """Connection closed error."""
111

    
112

    
113
class TimeoutError(ProtocolError):
114
  """Operation timeout error."""
115

    
116

    
117
class RequestError(ProtocolError):
118
  """Error on request.
119

120
  This signifies an error in the request format or request handling,
121
  but not (e.g.) an error in starting up an instance.
122

123
  Some common conditions that can trigger this exception:
124
    - job submission failed because the job data was wrong
125
    - query failed because required fields were missing
126

127
  """
128

    
129

    
130
class NoMasterError(ProtocolError):
131
  """The master cannot be reached.
132

133
  This means that the master daemon is not running or the socket has
134
  been removed.
135

136
  """
137

    
138

    
139
class PermissionError(ProtocolError):
140
  """Permission denied while connecting to the master socket.
141

142
  This means the user doesn't have the proper rights.
143

144
  """
145

    
146

    
147
class Transport:
148
  """Low-level transport class.
149

150
  This is used on the client side.
151

152
  This could be replace by any other class that provides the same
153
  semantics to the Client. This means:
154
    - can send messages and receive messages
155
    - safe for multithreading
156

157
  """
158

    
159
  def __init__(self, address, timeouts=None):
160
    """Constructor for the Client class.
161

162
    Arguments:
163
      - address: a valid address the the used transport class
164
      - timeout: a list of timeouts, to be used on connect and read/write
165

166
    There are two timeouts used since we might want to wait for a long
167
    time for a response, but the connect timeout should be lower.
168

169
    If not passed, we use a default of 10 and respectively 60 seconds.
170

171
    Note that on reading data, since the timeout applies to an
172
    invidual receive, it might be that the total duration is longer
173
    than timeout value passed (we make a hard limit at twice the read
174
    timeout).
175

176
    """
177
    self.address = address
178
    if timeouts is None:
179
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
180
    else:
181
      self._ctimeout, self._rwtimeout = timeouts
182

    
183
    self.socket = None
184
    self._buffer = ""
185
    self._msgs = collections.deque()
186

    
187
    try:
188
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
189

    
190
      # Try to connect
191
      try:
192
        utils.Retry(self._Connect, 1.0, self._ctimeout,
193
                    args=(self.socket, address, self._ctimeout))
194
      except utils.RetryTimeout:
195
        raise TimeoutError("Connect timed out")
196

    
197
      self.socket.settimeout(self._rwtimeout)
198
    except (socket.error, NoMasterError):
199
      if self.socket is not None:
200
        self.socket.close()
201
      self.socket = None
202
      raise
203

    
204
  @staticmethod
205
  def _Connect(sock, address, timeout):
206
    sock.settimeout(timeout)
207
    try:
208
      sock.connect(address)
209
    except socket.timeout, err:
210
      raise TimeoutError("Connect timed out: %s" % str(err))
211
    except socket.error, err:
212
      error_code = err.args[0]
213
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
214
        raise NoMasterError(address)
215
      elif error_code in (errno.EPERM, errno.EACCES):
216
        raise PermissionError(address)
217
      elif error_code == errno.EAGAIN:
218
        # Server's socket backlog is full at the moment
219
        raise utils.RetryAgain()
220
      raise
221

    
222
  def _CheckSocket(self):
223
    """Make sure we are connected.
224

225
    """
226
    if self.socket is None:
227
      raise ProtocolError("Connection is closed")
228

    
229
  def Send(self, msg):
230
    """Send a message.
231

232
    This just sends a message and doesn't wait for the response.
233

234
    """
235
    if constants.LUXI_EOM in msg:
236
      raise ProtocolError("Message terminator found in payload")
237

    
238
    self._CheckSocket()
239
    try:
240
      # TODO: sendall is not guaranteed to send everything
241
      self.socket.sendall(msg + constants.LUXI_EOM)
242
    except socket.timeout, err:
243
      raise TimeoutError("Sending timeout: %s" % str(err))
244

    
245
  def Recv(self):
246
    """Try to receive a message from the socket.
247

248
    In case we already have messages queued, we just return from the
249
    queue. Otherwise, we try to read data with a _rwtimeout network
250
    timeout, and making sure we don't go over 2x_rwtimeout as a global
251
    limit.
252

253
    """
254
    self._CheckSocket()
255
    etime = time.time() + self._rwtimeout
256
    while not self._msgs:
257
      if time.time() > etime:
258
        raise TimeoutError("Extended receive timeout")
259
      while True:
260
        try:
261
          data = self.socket.recv(4096)
262
        except socket.timeout, err:
263
          raise TimeoutError("Receive timeout: %s" % str(err))
264
        except socket.error, err:
265
          if err.args and err.args[0] == errno.EAGAIN:
266
            continue
267
          raise
268
        break
269
      if not data:
270
        raise ConnectionClosedError("Connection closed while reading")
271
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
272
      self._buffer = new_msgs.pop()
273
      self._msgs.extend(new_msgs)
274
    return self._msgs.popleft()
275

    
276
  def Call(self, msg):
277
    """Send a message and wait for the response.
278

279
    This is just a wrapper over Send and Recv.
280

281
    """
282
    self.Send(msg)
283
    return self.Recv()
284

    
285
  def Close(self):
286
    """Close the socket"""
287
    if self.socket is not None:
288
      self.socket.close()
289
      self.socket = None
290

    
291

    
292
def ParseRequest(msg):
293
  """Parses a LUXI request message.
294

295
  """
296
  try:
297
    request = serializer.LoadJson(msg)
298
  except ValueError, err:
299
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
300

    
301
  logging.debug("LUXI request: %s", request)
302

    
303
  if not isinstance(request, dict):
304
    logging.error("LUXI request not a dict: %r", msg)
305
    raise ProtocolError("Invalid LUXI request (not a dict)")
306

    
307
  method = request.get(KEY_METHOD, None) # pylint: disable=E1103
308
  args = request.get(KEY_ARGS, None) # pylint: disable=E1103
309
  version = request.get(KEY_VERSION, None) # pylint: disable=E1103
310

    
311
  if method is None or args is None:
312
    logging.error("LUXI request missing method or arguments: %r", msg)
313
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
314
                         " in request): %r") % msg)
315

    
316
  return (method, args, version)
317

    
318

    
319
def ParseResponse(msg):
320
  """Parses a LUXI response message.
321

322
  """
323
  # Parse the result
324
  try:
325
    data = serializer.LoadJson(msg)
326
  except KeyboardInterrupt:
327
    raise
328
  except Exception, err:
329
    raise ProtocolError("Error while deserializing response: %s" % str(err))
330

    
331
  # Validate response
332
  if not (isinstance(data, dict) and
333
          KEY_SUCCESS in data and
334
          KEY_RESULT in data):
335
    raise ProtocolError("Invalid response from server: %r" % data)
336

    
337
  return (data[KEY_SUCCESS], data[KEY_RESULT],
338
          data.get(KEY_VERSION, None)) # pylint: disable=E1103
339

    
340

    
341
def FormatResponse(success, result, version=None):
342
  """Formats a LUXI response message.
343

344
  """
345
  response = {
346
    KEY_SUCCESS: success,
347
    KEY_RESULT: result,
348
    }
349

    
350
  if version is not None:
351
    response[KEY_VERSION] = version
352

    
353
  logging.debug("LUXI response: %s", response)
354

    
355
  return serializer.DumpJson(response)
356

    
357

    
358
def FormatRequest(method, args, version=None):
359
  """Formats a LUXI request message.
360

361
  """
362
  # Build request
363
  request = {
364
    KEY_METHOD: method,
365
    KEY_ARGS: args,
366
    }
367

    
368
  if version is not None:
369
    request[KEY_VERSION] = version
370

    
371
  # Serialize the request
372
  return serializer.DumpJson(request)
373

    
374

    
375
def CallLuxiMethod(transport_cb, method, args, version=None):
376
  """Send a LUXI request via a transport and return the response.
377

378
  """
379
  assert callable(transport_cb)
380

    
381
  request_msg = FormatRequest(method, args, version=version)
382

    
383
  # Send request and wait for response
384
  response_msg = transport_cb(request_msg)
385

    
386
  (success, result, resp_version) = ParseResponse(response_msg)
387

    
388
  # Verify version if there was one in the response
389
  if resp_version is not None and resp_version != version:
390
    raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
391
                           (version, resp_version))
392

    
393
  if success:
394
    return result
395

    
396
  errors.MaybeRaise(result)
397
  raise RequestError(result)
398

    
399

    
400
class Client(object):
401
  """High-level client implementation.
402

403
  This uses a backing Transport-like class on top of which it
404
  implements data serialization/deserialization.
405

406
  """
407
  def __init__(self, address=None, timeouts=None, transport=Transport):
408
    """Constructor for the Client class.
409

410
    Arguments:
411
      - address: a valid address the the used transport class
412
      - timeout: a list of timeouts, to be used on connect and read/write
413
      - transport: a Transport-like class
414

415

416
    If timeout is not passed, the default timeouts of the transport
417
    class are used.
418

419
    """
420
    if address is None:
421
      address = pathutils.MASTER_SOCKET
422
    self.address = address
423
    self.timeouts = timeouts
424
    self.transport_class = transport
425
    self.transport = None
426
    self._InitTransport()
427

    
428
  def _InitTransport(self):
429
    """(Re)initialize the transport if needed.
430

431
    """
432
    if self.transport is None:
433
      self.transport = self.transport_class(self.address,
434
                                            timeouts=self.timeouts)
435

    
436
  def _CloseTransport(self):
437
    """Close the transport, ignoring errors.
438

439
    """
440
    if self.transport is None:
441
      return
442
    try:
443
      old_transp = self.transport
444
      self.transport = None
445
      old_transp.Close()
446
    except Exception: # pylint: disable=W0703
447
      pass
448

    
449
  def _SendMethodCall(self, data):
450
    # Send request and wait for response
451
    try:
452
      self._InitTransport()
453
      return self.transport.Call(data)
454
    except Exception:
455
      self._CloseTransport()
456
      raise
457

    
458
  def Close(self):
459
    """Close the underlying connection.
460

461
    """
462
    self._CloseTransport()
463

    
464
  def CallMethod(self, method, args):
465
    """Send a generic request and return the response.
466

467
    """
468
    if not isinstance(args, (list, tuple)):
469
      raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
470
                                   " expected list, got %s" % type(args))
471
    return CallLuxiMethod(self._SendMethodCall, method, args,
472
                          version=constants.LUXI_VERSION)
473

    
474
  def SetQueueDrainFlag(self, drain_flag):
475
    return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
476

    
477
  def SetWatcherPause(self, until):
478
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
479

    
480
  def SubmitJob(self, ops):
481
    ops_state = map(lambda op: op.__getstate__(), ops)
482
    return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
483

    
484
  def SubmitManyJobs(self, jobs):
485
    jobs_state = []
486
    for ops in jobs:
487
      jobs_state.append([op.__getstate__() for op in ops])
488
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
489

    
490
  @staticmethod
491
  def _PrepareJobId(request_name, job_id):
492
    try:
493
      return int(job_id)
494
    except ValueError:
495
      raise RequestError("Invalid parameter passed to %s as job id: "
496
                         " expected integer, got value %s" %
497
                         (request_name, job_id))
498

    
499
  def CancelJob(self, job_id):
500
    job_id = Client._PrepareJobId(REQ_CANCEL_JOB, job_id)
501
    return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
502

    
503
  def ArchiveJob(self, job_id):
504
    job_id = Client._PrepareJobId(REQ_ARCHIVE_JOB, job_id)
505
    return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
506

    
507
  def ChangeJobPriority(self, job_id, priority):
508
    job_id = Client._PrepareJobId(REQ_CHANGE_JOB_PRIORITY, job_id)
509
    return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
510

    
511
  def AutoArchiveJobs(self, age):
512
    timeout = (DEF_RWTO - 1) / 2
513
    return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
514

    
515
  def WaitForJobChangeOnce(self, job_id, fields,
516
                           prev_job_info, prev_log_serial,
517
                           timeout=WFJC_TIMEOUT):
518
    """Waits for changes on a job.
519

520
    @param job_id: Job ID
521
    @type fields: list
522
    @param fields: List of field names to be observed
523
    @type prev_job_info: None or list
524
    @param prev_job_info: Previously received job information
525
    @type prev_log_serial: None or int/long
526
    @param prev_log_serial: Highest log serial number previously received
527
    @type timeout: int/float
528
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
529
                    be capped to that value)
530

531
    """
532
    assert timeout >= 0, "Timeout can not be negative"
533
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
534
                           (job_id, fields, prev_job_info,
535
                            prev_log_serial,
536
                            min(WFJC_TIMEOUT, timeout)))
537

    
538
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
539
    job_id = Client._PrepareJobId(REQ_WAIT_FOR_JOB_CHANGE, job_id)
540
    while True:
541
      result = self.WaitForJobChangeOnce(job_id, fields,
542
                                         prev_job_info, prev_log_serial)
543
      if result != constants.JOB_NOTCHANGED:
544
        break
545
    return result
546

    
547
  def Query(self, what, fields, qfilter):
548
    """Query for resources/items.
549

550
    @param what: One of L{constants.QR_VIA_LUXI}
551
    @type fields: List of strings
552
    @param fields: List of requested fields
553
    @type qfilter: None or list
554
    @param qfilter: Query filter
555
    @rtype: L{objects.QueryResponse}
556

557
    """
558
    result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
559
    return objects.QueryResponse.FromDict(result)
560

    
561
  def QueryFields(self, what, fields):
562
    """Query for available fields.
563

564
    @param what: One of L{constants.QR_VIA_LUXI}
565
    @type fields: None or list of strings
566
    @param fields: List of requested fields
567
    @rtype: L{objects.QueryFieldsResponse}
568

569
    """
570
    result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
571
    return objects.QueryFieldsResponse.FromDict(result)
572

    
573
  def QueryJobs(self, job_ids, fields):
574
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
575

    
576
  def QueryInstances(self, names, fields, use_locking):
577
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
578

    
579
  def QueryNodes(self, names, fields, use_locking):
580
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
581

    
582
  def QueryGroups(self, names, fields, use_locking):
583
    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
584

    
585
  def QueryNetworks(self, names, fields, use_locking):
586
    return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
587

    
588
  def QueryExports(self, nodes, use_locking):
589
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
590

    
591
  def QueryClusterInfo(self):
592
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
593

    
594
  def QueryConfigValues(self, fields):
595
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
596

    
597
  def QueryTags(self, kind, name):
598
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))