Add directory for archived jobs
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling."""
23
24 import os
25 import logging
26 import threading
27 import errno
28 import re
29 import time
30
31 from ganeti import constants
32 from ganeti import serializer
33 from ganeti import workerpool
34 from ganeti import opcodes
35 from ganeti import errors
36 from ganeti import mcpu
37 from ganeti import utils
38 from ganeti import rpc
39
40
41 JOBQUEUE_THREADS = 5
42
43
44 class _QueuedOpCode(object):
45   """Encasulates an opcode object.
46
47   Access is synchronized by the '_lock' attribute.
48
49   The 'log' attribute holds the execution log and consists of tuples
50   of the form (timestamp, level, message).
51
52   """
53   def __init__(self, op):
54     self.__Setup(op, constants.OP_STATUS_QUEUED, None, [])
55
56   def __Setup(self, input_, status, result, log):
57     self._lock = threading.Lock()
58     self.input = input_
59     self.status = status
60     self.result = result
61     self.log = log
62
63   @classmethod
64   def Restore(cls, state):
65     obj = object.__new__(cls)
66     obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
67                 state["status"], state["result"], state["log"])
68     return obj
69
70   @utils.LockedMethod
71   def Serialize(self):
72     return {
73       "input": self.input.__getstate__(),
74       "status": self.status,
75       "result": self.result,
76       "log": self.log,
77       }
78
79   @utils.LockedMethod
80   def GetInput(self):
81     """Returns the original opcode.
82
83     """
84     return self.input
85
86   @utils.LockedMethod
87   def SetStatus(self, status, result):
88     """Update the opcode status and result.
89
90     """
91     self.status = status
92     self.result = result
93
94   @utils.LockedMethod
95   def GetStatus(self):
96     """Get the opcode status.
97
98     """
99     return self.status
100
101   @utils.LockedMethod
102   def GetResult(self):
103     """Get the opcode result.
104
105     """
106     return self.result
107
108   @utils.LockedMethod
109   def Log(self, *args):
110     """Append a log entry.
111
112     """
113     assert len(args) < 2
114
115     if len(args) == 1:
116       log_type = constants.ELOG_MESSAGE
117       log_msg = args[0]
118     else:
119       log_type, log_msg = args
120     self.log.append((time.time(), log_type, log_msg))
121
122   @utils.LockedMethod
123   def RetrieveLog(self, start_at=0):
124     """Retrieve (a part of) the execution log.
125
126     """
127     return self.log[start_at:]
128
129
130 class _QueuedJob(object):
131   """In-memory job representation.
132
133   This is what we use to track the user-submitted jobs.
134
135   """
136   def __init__(self, storage, job_id, ops):
137     if not ops:
138       # TODO
139       raise Exception("No opcodes")
140
141     self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1)
142
143   def __Setup(self, storage, job_id, ops, run_op_index):
144     self._lock = threading.Lock()
145     self.storage = storage
146     self.id = job_id
147     self._ops = ops
148     self.run_op_index = run_op_index
149
150   @classmethod
151   def Restore(cls, storage, state):
152     obj = object.__new__(cls)
153     op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
154     obj.__Setup(storage, state["id"], op_list, state["run_op_index"])
155     return obj
156
157   def Serialize(self):
158     return {
159       "id": self.id,
160       "ops": [op.Serialize() for op in self._ops],
161       "run_op_index": self.run_op_index,
162       }
163
164   def SetUnclean(self, msg):
165     try:
166       for op in self._ops:
167         op.SetStatus(constants.OP_STATUS_ERROR, msg)
168     finally:
169       self.storage.UpdateJob(self)
170
171   def GetStatus(self):
172     status = constants.JOB_STATUS_QUEUED
173
174     all_success = True
175     for op in self._ops:
176       op_status = op.GetStatus()
177       if op_status == constants.OP_STATUS_SUCCESS:
178         continue
179
180       all_success = False
181
182       if op_status == constants.OP_STATUS_QUEUED:
183         pass
184       elif op_status == constants.OP_STATUS_RUNNING:
185         status = constants.JOB_STATUS_RUNNING
186       elif op_status == constants.OP_STATUS_ERROR:
187         status = constants.JOB_STATUS_ERROR
188         # The whole job fails if one opcode failed
189         break
190
191     if all_success:
192       status = constants.JOB_STATUS_SUCCESS
193
194     return status
195
196   @utils.LockedMethod
197   def GetRunOpIndex(self):
198     return self.run_op_index
199
200   def Run(self, proc):
201     """Job executor.
202
203     This functions processes a this job in the context of given processor
204     instance.
205
206     Args:
207     - proc: Ganeti Processor to run the job with
208
209     """
210     try:
211       count = len(self._ops)
212       for idx, op in enumerate(self._ops):
213         try:
214           logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
215
216           self._lock.acquire()
217           try:
218             self.run_op_index = idx
219           finally:
220             self._lock.release()
221
222           op.SetStatus(constants.OP_STATUS_RUNNING, None)
223           self.storage.UpdateJob(self)
224
225           result = proc.ExecOpCode(op.input, op.Log)
226
227           op.SetStatus(constants.OP_STATUS_SUCCESS, result)
228           self.storage.UpdateJob(self)
229           logging.debug("Op %s/%s: Successfully finished %s",
230                         idx + 1, count, op)
231         except Exception, err:
232           try:
233             op.SetStatus(constants.OP_STATUS_ERROR, str(err))
234             logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
235           finally:
236             self.storage.UpdateJob(self)
237           raise
238
239     except errors.GenericError, err:
240       logging.error("ganeti exception %s", exc_info=err)
241     except Exception, err:
242       logging.error("unhandled exception %s", exc_info=err)
243     except:
244       logging.error("unhandled unknown exception %s", exc_info=err)
245
246
247 class _JobQueueWorker(workerpool.BaseWorker):
248   def RunTask(self, job):
249     logging.debug("Worker %s processing job %s",
250                   self.worker_id, job.id)
251     # TODO: feedback function
252     proc = mcpu.Processor(self.pool.context)
253     try:
254       job.Run(proc)
255     finally:
256       logging.debug("Worker %s finished job %s, status = %s",
257                     self.worker_id, job.id, job.GetStatus())
258
259
260 class _JobQueueWorkerPool(workerpool.WorkerPool):
261   def __init__(self, context):
262     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
263                                               _JobQueueWorker)
264     self.context = context
265
266
267 class JobStorageBase(object):
268   def __init__(self, id_prefix):
269     self.id_prefix = id_prefix
270
271     if id_prefix:
272       prefix_pattern = re.escape("%s-" % id_prefix)
273     else:
274       prefix_pattern = ""
275
276     # Apart from the prefix, all job IDs are numeric
277     self._re_job_id = re.compile(r"^%s\d+$" % prefix_pattern)
278
279   def OwnsJobId(self, job_id):
280     return self._re_job_id.match(job_id)
281
282   def FormatJobID(self, job_id):
283     if not isinstance(job_id, (int, long)):
284       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
285     if job_id < 0:
286       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
287
288     if self.id_prefix:
289       prefix = "%s-" % self.id_prefix
290     else:
291       prefix = ""
292
293     return "%s%010d" % (prefix, job_id)
294
295
296 class DiskJobStorage(JobStorageBase):
297   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
298
299   def __init__(self, id_prefix):
300     JobStorageBase.__init__(self, id_prefix)
301
302     self._lock = threading.Lock()
303     self._memcache = {}
304     self._my_hostname = utils.HostInfo().name
305
306     # Make sure our directories exists
307     for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR):
308       try:
309         os.mkdir(path, 0700)
310       except OSError, err:
311         if err.errno not in (errno.EEXIST, ):
312           raise
313
314     # Get queue lock
315     self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
316     try:
317       utils.LockFile(self.lock_fd)
318     except:
319       self.lock_fd.close()
320       raise
321
322     # Read version
323     try:
324       version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
325     except IOError, err:
326       if err.errno not in (errno.ENOENT, ):
327         raise
328
329       # Setup a new queue
330       self._InitQueueUnlocked()
331
332       # Try to open again
333       version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
334
335     try:
336       # Try to read version
337       version = int(version_fd.read(128))
338
339       # Verify version
340       if version != constants.JOB_QUEUE_VERSION:
341         raise errors.JobQueueError("Found version %s, expected %s",
342                                    version, constants.JOB_QUEUE_VERSION)
343     finally:
344       version_fd.close()
345
346     self._last_serial = self._ReadSerial()
347     if self._last_serial is None:
348       raise errors.ConfigurationError("Can't read/parse the job queue serial"
349                                       " file")
350
351   @staticmethod
352   def _ReadSerial():
353     """Try to read the job serial file.
354
355     @rtype: None or int
356     @return: If the serial can be read, then it is returned. Otherwise None
357              is returned.
358
359     """
360     try:
361       serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
362       try:
363         # Read last serial
364         serial = int(serial_fd.read(1024).strip())
365       finally:
366         serial_fd.close()
367     except (ValueError, EnvironmentError):
368       serial = None
369
370     return serial
371
372   def Close(self):
373     assert self.lock_fd, "Queue should be open"
374
375     self.lock_fd.close()
376     self.lock_fd = None
377
378   def _InitQueueUnlocked(self):
379     assert self.lock_fd, "Queue should be open"
380
381     utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
382                     data="%s\n" % constants.JOB_QUEUE_VERSION)
383     if self._ReadSerial() is None:
384       utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
385                       data="%s\n" % 0)
386
387   def _NewSerialUnlocked(self, nodes):
388     """Generates a new job identifier.
389
390     Job identifiers are unique during the lifetime of a cluster.
391
392     Returns: A string representing the job identifier.
393
394     """
395     assert self.lock_fd, "Queue should be open"
396
397     # New number
398     serial = self._last_serial + 1
399
400     # Write to file
401     utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
402                     data="%s\n" % serial)
403
404     # Keep it only if we were able to write the file
405     self._last_serial = serial
406
407     # Distribute the serial to the other nodes
408     try:
409       nodes.remove(self._my_hostname)
410     except ValueError:
411       pass
412
413     result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE)
414     for node in nodes:
415       if not result[node]:
416         logging.error("copy of job queue file to node %s failed", node)
417
418     return self.FormatJobID(serial)
419
420   def _GetJobPath(self, job_id):
421     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
422
423   def _GetArchivedJobPath(self, job_id):
424     return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
425
426   def _GetJobIDsUnlocked(self, archived=False):
427     """Return all known job IDs.
428
429     If the parameter archived is True, archived jobs IDs will be
430     included. Currently this argument is unused.
431
432     The method only looks at disk because it's a requirement that all
433     jobs are present on disk (so in the _memcache we don't have any
434     extra IDs).
435
436     """
437     jfiles = self._ListJobFiles()
438     jlist = [m.group(1) for m in
439              [self._RE_JOB_FILE.match(name) for name in jfiles]]
440     jlist.sort()
441     return jlist
442
443   def _ListJobFiles(self):
444     assert self.lock_fd, "Queue should be open"
445
446     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
447             if self._RE_JOB_FILE.match(name)]
448
449   def _LoadJobUnlocked(self, job_id):
450     assert self.lock_fd, "Queue should be open"
451
452     if job_id in self._memcache:
453       logging.debug("Found job %s in memcache", job_id)
454       return self._memcache[job_id]
455
456     filepath = self._GetJobPath(job_id)
457     logging.debug("Loading job from %s", filepath)
458     try:
459       fd = open(filepath, "r")
460     except IOError, err:
461       if err.errno in (errno.ENOENT, ):
462         return None
463       raise
464     try:
465       data = serializer.LoadJson(fd.read())
466     finally:
467       fd.close()
468
469     job = _QueuedJob.Restore(self, data)
470     self._memcache[job_id] = job
471     logging.debug("Added job %s to the cache", job_id)
472     return job
473
474   def _GetJobsUnlocked(self, job_ids):
475     if not job_ids:
476       job_ids = self._GetJobIDsUnlocked()
477
478     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
479
480   @utils.LockedMethod
481   def GetJobs(self, job_ids):
482     return self._GetJobsUnlocked(job_ids)
483
484   @utils.LockedMethod
485   def AddJob(self, ops, nodes):
486     """Create and store on disk a new job.
487
488     @type ops: list
489     @param ops: The list of OpCodes that will become the new job.
490     @type nodes: list
491     @param nodes: The list of nodes to which the new job serial will be
492                   distributed.
493
494     """
495     assert self.lock_fd, "Queue should be open"
496
497     # Get job identifier
498     job_id = self._NewSerialUnlocked(nodes)
499     job = _QueuedJob(self, job_id, ops)
500
501     # Write to disk
502     self._UpdateJobUnlocked(job)
503
504     logging.debug("Added new job %s to the cache", job_id)
505     self._memcache[job_id] = job
506
507     return job
508
509   def _UpdateJobUnlocked(self, job):
510     assert self.lock_fd, "Queue should be open"
511
512     filename = self._GetJobPath(job.id)
513     logging.debug("Writing job %s to %s", job.id, filename)
514     utils.WriteFile(filename,
515                     data=serializer.DumpJson(job.Serialize(), indent=False))
516     self._CleanCacheUnlocked([job.id])
517
518   def _CleanCacheUnlocked(self, exclude):
519     """Clean the memory cache.
520
521     The exceptions argument contains job IDs that should not be
522     cleaned.
523
524     """
525     assert isinstance(exclude, list)
526     for job in self._memcache.values():
527       if job.id in exclude:
528         continue
529       if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,
530                                  constants.JOB_STATUS_RUNNING):
531         logging.debug("Cleaning job %s from the cache", job.id)
532         try:
533           del self._memcache[job.id]
534         except KeyError:
535           pass
536
537   @utils.LockedMethod
538   def UpdateJob(self, job):
539     return self._UpdateJobUnlocked(job)
540
541   def ArchiveJob(self, job_id):
542     raise NotImplementedError()
543
544
545 class JobQueue:
546   """The job queue.
547
548   """
549   def __init__(self, context):
550     self._lock = threading.Lock()
551     self._jobs = DiskJobStorage("")
552     self._wpool = _JobQueueWorkerPool(context)
553
554     for job in self._jobs.GetJobs(None):
555       status = job.GetStatus()
556       if status in (constants.JOB_STATUS_QUEUED, ):
557         self._wpool.AddTask(job)
558
559       elif status in (constants.JOB_STATUS_RUNNING, ):
560         logging.warning("Unfinished job %s found: %s", job.id, job)
561         job.SetUnclean("Unclean master daemon shutdown")
562
563   @utils.LockedMethod
564   def SubmitJob(self, ops, nodes):
565     """Add a new job to the queue.
566
567     This enters the job into our job queue and also puts it on the new
568     queue, in order for it to be picked up by the queue processors.
569
570     @type ops: list
571     @param ops: the sequence of opcodes that will become the new job
572     @type nodes: list
573     @param nodes: the list of nodes to which the queue should be
574                   distributed
575
576     """
577     job = self._jobs.AddJob(ops, nodes)
578
579     # Add to worker pool
580     self._wpool.AddTask(job)
581
582     return job.id
583
584   def ArchiveJob(self, job_id):
585     raise NotImplementedError()
586
587   def CancelJob(self, job_id):
588     raise NotImplementedError()
589
590   def _GetJobInfo(self, job, fields):
591     row = []
592     for fname in fields:
593       if fname == "id":
594         row.append(job.id)
595       elif fname == "status":
596         row.append(job.GetStatus())
597       elif fname == "ops":
598         row.append([op.GetInput().__getstate__() for op in job._ops])
599       elif fname == "opresult":
600         row.append([op.GetResult() for op in job._ops])
601       elif fname == "opstatus":
602         row.append([op.GetStatus() for op in job._ops])
603       elif fname == "ticker":
604         ji = job.GetRunOpIndex()
605         if ji < 0:
606           lmsg = None
607         else:
608           lmsg = job._ops[ji].RetrieveLog(-1)
609           # message might be empty here
610           if lmsg:
611             lmsg = lmsg[0]
612           else:
613             lmsg = None
614         row.append(lmsg)
615       else:
616         raise errors.OpExecError("Invalid job query field '%s'" % fname)
617     return row
618
619   def QueryJobs(self, job_ids, fields):
620     """Returns a list of jobs in queue.
621
622     Args:
623     - job_ids: Sequence of job identifiers or None for all
624     - fields: Names of fields to return
625
626     """
627     self._lock.acquire()
628     try:
629       jobs = []
630
631       for job in self._jobs.GetJobs(job_ids):
632         if job is None:
633           jobs.append(None)
634         else:
635           jobs.append(self._GetJobInfo(job, fields))
636
637       return jobs
638     finally:
639       self._lock.release()
640
641   @utils.LockedMethod
642   def Shutdown(self):
643     """Stops the job queue.
644
645     """
646     self._wpool.TerminateWorkers()
647     self._jobs.Close()