Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ cda215a9

History | View | Annotate | Download (7.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the LUXI protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
from ganeti import constants
33
from ganeti import objects
34
import ganeti.rpc.client as cl
35
from ganeti.rpc.transport import Transport
36
from ganeti.rpc.errors import (ProtocolError, ConnectionClosedError,
37
                               TimeoutError, RequestError, NoMasterError,
38
                               PermissionError)
39

    
40
__all__ = [
41
  # functions:
42
  "ProtocolError",
43
  "ConnectionClosedError",
44
  "TimeoutError",
45
  "RequestError",
46
  "NoMasterError",
47
  "PermissionError",
48
  # classes:
49
  "Client"
50
  ]
51

    
52
REQ_SUBMIT_JOB = constants.LUXI_REQ_SUBMIT_JOB
53
REQ_SUBMIT_JOB_TO_DRAINED_QUEUE = constants.LUXI_REQ_SUBMIT_JOB_TO_DRAINED_QUEUE
54
REQ_SUBMIT_MANY_JOBS = constants.LUXI_REQ_SUBMIT_MANY_JOBS
55
REQ_PICKUP_JOB = constants.LUXI_REQ_PICKUP_JOB
56
REQ_WAIT_FOR_JOB_CHANGE = constants.LUXI_REQ_WAIT_FOR_JOB_CHANGE
57
REQ_CANCEL_JOB = constants.LUXI_REQ_CANCEL_JOB
58
REQ_ARCHIVE_JOB = constants.LUXI_REQ_ARCHIVE_JOB
59
REQ_CHANGE_JOB_PRIORITY = constants.LUXI_REQ_CHANGE_JOB_PRIORITY
60
REQ_AUTO_ARCHIVE_JOBS = constants.LUXI_REQ_AUTO_ARCHIVE_JOBS
61
REQ_QUERY = constants.LUXI_REQ_QUERY
62
REQ_QUERY_FIELDS = constants.LUXI_REQ_QUERY_FIELDS
63
REQ_QUERY_JOBS = constants.LUXI_REQ_QUERY_JOBS
64
REQ_QUERY_INSTANCES = constants.LUXI_REQ_QUERY_INSTANCES
65
REQ_QUERY_NODES = constants.LUXI_REQ_QUERY_NODES
66
REQ_QUERY_GROUPS = constants.LUXI_REQ_QUERY_GROUPS
67
REQ_QUERY_NETWORKS = constants.LUXI_REQ_QUERY_NETWORKS
68
REQ_QUERY_EXPORTS = constants.LUXI_REQ_QUERY_EXPORTS
69
REQ_QUERY_CONFIG_VALUES = constants.LUXI_REQ_QUERY_CONFIG_VALUES
70
REQ_QUERY_CLUSTER_INFO = constants.LUXI_REQ_QUERY_CLUSTER_INFO
71
REQ_QUERY_TAGS = constants.LUXI_REQ_QUERY_TAGS
72
REQ_SET_DRAIN_FLAG = constants.LUXI_REQ_SET_DRAIN_FLAG
73
REQ_SET_WATCHER_PAUSE = constants.LUXI_REQ_SET_WATCHER_PAUSE
74
REQ_ALL = constants.LUXI_REQ_ALL
75

    
76
DEF_RWTO = constants.LUXI_DEF_RWTO
77
WFJC_TIMEOUT = constants.LUXI_WFJC_TIMEOUT
78

    
79

    
80
class Client(cl.AbstractClient):
81
  """High-level client implementation.
82

83
  This uses a backing Transport-like class on top of which it
84
  implements data serialization/deserialization.
85

86
  """
87
  def __init__(self, address=None, timeouts=None, transport=Transport):
88
    """Constructor for the Client class.
89

90
    Arguments are the same as for L{AbstractClient}.
91

92
    """
93
    super(Client, self).__init__(address, timeouts, transport)
94
    # Override the version of the protocol:
95
    self.version = constants.LUXI_VERSION
96

    
97
  def SetQueueDrainFlag(self, drain_flag):
98
    return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
99

    
100
  def SetWatcherPause(self, until):
101
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
102

    
103
  def PickupJob(self, job):
104
    return self.CallMethod(REQ_PICKUP_JOB, (job,))
105

    
106
  def SubmitJob(self, ops):
107
    ops_state = map(lambda op: op.__getstate__(), ops)
108
    return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
109

    
110
  def SubmitJobToDrainedQueue(self, ops):
111
    ops_state = map(lambda op: op.__getstate__(), ops)
112
    return self.CallMethod(REQ_SUBMIT_JOB_TO_DRAINED_QUEUE, (ops_state, ))
113

    
114
  def SubmitManyJobs(self, jobs):
115
    jobs_state = []
116
    for ops in jobs:
117
      jobs_state.append([op.__getstate__() for op in ops])
118
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
119

    
120
  def CancelJob(self, job_id):
121
    return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
122

    
123
  def ArchiveJob(self, job_id):
124
    return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
125

    
126
  def ChangeJobPriority(self, job_id, priority):
127
    return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
128

    
129
  def AutoArchiveJobs(self, age):
130
    timeout = (DEF_RWTO - 1) / 2
131
    return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
132

    
133
  def WaitForJobChangeOnce(self, job_id, fields,
134
                           prev_job_info, prev_log_serial,
135
                           timeout=WFJC_TIMEOUT):
136
    """Waits for changes on a job.
137

138
    @param job_id: Job ID
139
    @type fields: list
140
    @param fields: List of field names to be observed
141
    @type prev_job_info: None or list
142
    @param prev_job_info: Previously received job information
143
    @type prev_log_serial: None or int/long
144
    @param prev_log_serial: Highest log serial number previously received
145
    @type timeout: int/float
146
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
147
                    be capped to that value)
148

149
    """
150
    assert timeout >= 0, "Timeout can not be negative"
151
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
152
                           (job_id, fields, prev_job_info,
153
                            prev_log_serial,
154
                            min(WFJC_TIMEOUT, timeout)))
155

    
156
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
157
    while True:
158
      result = self.WaitForJobChangeOnce(job_id, fields,
159
                                         prev_job_info, prev_log_serial)
160
      if result != constants.JOB_NOTCHANGED:
161
        break
162
    return result
163

    
164
  def Query(self, what, fields, qfilter):
165
    """Query for resources/items.
166

167
    @param what: One of L{constants.QR_VIA_LUXI}
168
    @type fields: List of strings
169
    @param fields: List of requested fields
170
    @type qfilter: None or list
171
    @param qfilter: Query filter
172
    @rtype: L{objects.QueryResponse}
173

174
    """
175
    result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
176
    return objects.QueryResponse.FromDict(result)
177

    
178
  def QueryFields(self, what, fields):
179
    """Query for available fields.
180

181
    @param what: One of L{constants.QR_VIA_LUXI}
182
    @type fields: None or list of strings
183
    @param fields: List of requested fields
184
    @rtype: L{objects.QueryFieldsResponse}
185

186
    """
187
    result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
188
    return objects.QueryFieldsResponse.FromDict(result)
189

    
190
  def QueryJobs(self, job_ids, fields):
191
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
192

    
193
  def QueryInstances(self, names, fields, use_locking):
194
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
195

    
196
  def QueryNodes(self, names, fields, use_locking):
197
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
198

    
199
  def QueryGroups(self, names, fields, use_locking):
200
    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
201

    
202
  def QueryNetworks(self, names, fields, use_locking):
203
    return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
204

    
205
  def QueryExports(self, nodes, use_locking):
206
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
207

    
208
  def QueryClusterInfo(self):
209
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
210

    
211
  def QueryConfigValues(self, fields):
212
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
213

    
214
  def QueryTags(self, kind, name):
215
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))