Implement job canceling on server side
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling."""
23
24 import os
25 import logging
26 import threading
27 import errno
28 import re
29 import time
30
31 from ganeti import constants
32 from ganeti import serializer
33 from ganeti import workerpool
34 from ganeti import opcodes
35 from ganeti import errors
36 from ganeti import mcpu
37 from ganeti import utils
38 from ganeti import rpc
39
40
41 JOBQUEUE_THREADS = 5
42
43
44 class _QueuedOpCode(object):
45   """Encasulates an opcode object.
46
47   Access is synchronized by the '_lock' attribute.
48
49   The 'log' attribute holds the execution log and consists of tuples
50   of the form (timestamp, level, message).
51
52   """
53   def __init__(self, op):
54     self.__Setup(op, constants.OP_STATUS_QUEUED, None, [])
55
56   def __Setup(self, input_, status, result, log):
57     self._lock = threading.Lock()
58     self.input = input_
59     self.status = status
60     self.result = result
61     self.log = log
62
63   @classmethod
64   def Restore(cls, state):
65     obj = object.__new__(cls)
66     obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
67                 state["status"], state["result"], state["log"])
68     return obj
69
70   @utils.LockedMethod
71   def Serialize(self):
72     return {
73       "input": self.input.__getstate__(),
74       "status": self.status,
75       "result": self.result,
76       "log": self.log,
77       }
78
79   @utils.LockedMethod
80   def GetInput(self):
81     """Returns the original opcode.
82
83     """
84     return self.input
85
86   @utils.LockedMethod
87   def SetStatus(self, status, result):
88     """Update the opcode status and result.
89
90     """
91     self.status = status
92     self.result = result
93
94   @utils.LockedMethod
95   def GetStatus(self):
96     """Get the opcode status.
97
98     """
99     return self.status
100
101   @utils.LockedMethod
102   def GetResult(self):
103     """Get the opcode result.
104
105     """
106     return self.result
107
108   @utils.LockedMethod
109   def Log(self, *args):
110     """Append a log entry.
111
112     """
113     assert len(args) < 2
114
115     if len(args) == 1:
116       log_type = constants.ELOG_MESSAGE
117       log_msg = args[0]
118     else:
119       log_type, log_msg = args
120     self.log.append((time.time(), log_type, log_msg))
121
122   @utils.LockedMethod
123   def RetrieveLog(self, start_at=0):
124     """Retrieve (a part of) the execution log.
125
126     """
127     return self.log[start_at:]
128
129
130 class _QueuedJob(object):
131   """In-memory job representation.
132
133   This is what we use to track the user-submitted jobs.
134
135   """
136   def __init__(self, storage, job_id, ops):
137     if not ops:
138       # TODO
139       raise Exception("No opcodes")
140
141     self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1)
142
143   def __Setup(self, storage, job_id, ops, run_op_index):
144     self._lock = threading.Lock()
145     self.storage = storage
146     self.id = job_id
147     self._ops = ops
148     self.run_op_index = run_op_index
149
150   @classmethod
151   def Restore(cls, storage, state):
152     obj = object.__new__(cls)
153     op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
154     obj.__Setup(storage, state["id"], op_list, state["run_op_index"])
155     return obj
156
157   def Serialize(self):
158     return {
159       "id": self.id,
160       "ops": [op.Serialize() for op in self._ops],
161       "run_op_index": self.run_op_index,
162       }
163
164   def _SetStatus(self, status, msg):
165     try:
166       for op in self._ops:
167         op.SetStatus(status, msg)
168     finally:
169       self.storage.UpdateJob(self)
170
171   def SetUnclean(self, msg):
172     return self._SetStatus(constants.OP_STATUS_ERROR, msg)
173
174   def SetCanceled(self, msg):
175     return self._SetStatus(constants.JOB_STATUS_CANCELED, msg)
176
177   def GetStatus(self):
178     status = constants.JOB_STATUS_QUEUED
179
180     all_success = True
181     for op in self._ops:
182       op_status = op.GetStatus()
183       if op_status == constants.OP_STATUS_SUCCESS:
184         continue
185
186       all_success = False
187
188       if op_status == constants.OP_STATUS_QUEUED:
189         pass
190       elif op_status == constants.OP_STATUS_RUNNING:
191         status = constants.JOB_STATUS_RUNNING
192       elif op_status == constants.OP_STATUS_ERROR:
193         status = constants.JOB_STATUS_ERROR
194         # The whole job fails if one opcode failed
195         break
196       elif op_status == constants.OP_STATUS_CANCELED:
197         status = constants.OP_STATUS_CANCELED
198         break
199
200     if all_success:
201       status = constants.JOB_STATUS_SUCCESS
202
203     return status
204
205   @utils.LockedMethod
206   def GetRunOpIndex(self):
207     return self.run_op_index
208
209   def Run(self, proc):
210     """Job executor.
211
212     This functions processes a this job in the context of given processor
213     instance.
214
215     Args:
216     - proc: Ganeti Processor to run the job with
217
218     """
219     try:
220       count = len(self._ops)
221       for idx, op in enumerate(self._ops):
222         try:
223           logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
224
225           self._lock.acquire()
226           try:
227             self.run_op_index = idx
228           finally:
229             self._lock.release()
230
231           op.SetStatus(constants.OP_STATUS_RUNNING, None)
232           self.storage.UpdateJob(self)
233
234           result = proc.ExecOpCode(op.input, op.Log)
235
236           op.SetStatus(constants.OP_STATUS_SUCCESS, result)
237           self.storage.UpdateJob(self)
238           logging.debug("Op %s/%s: Successfully finished %s",
239                         idx + 1, count, op)
240         except Exception, err:
241           try:
242             op.SetStatus(constants.OP_STATUS_ERROR, str(err))
243             logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
244           finally:
245             self.storage.UpdateJob(self)
246           raise
247
248     except errors.GenericError, err:
249       logging.error("ganeti exception %s", exc_info=err)
250     except Exception, err:
251       logging.error("unhandled exception %s", exc_info=err)
252     except:
253       logging.error("unhandled unknown exception %s", exc_info=err)
254
255
256 class _JobQueueWorker(workerpool.BaseWorker):
257   def RunTask(self, job):
258     logging.debug("Worker %s processing job %s",
259                   self.worker_id, job.id)
260     # TODO: feedback function
261     proc = mcpu.Processor(self.pool.context)
262     try:
263       job.Run(proc)
264     finally:
265       logging.debug("Worker %s finished job %s, status = %s",
266                     self.worker_id, job.id, job.GetStatus())
267
268
269 class _JobQueueWorkerPool(workerpool.WorkerPool):
270   def __init__(self, context):
271     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
272                                               _JobQueueWorker)
273     self.context = context
274
275
276 class JobStorageBase(object):
277   def __init__(self, id_prefix):
278     self.id_prefix = id_prefix
279
280     if id_prefix:
281       prefix_pattern = re.escape("%s-" % id_prefix)
282     else:
283       prefix_pattern = ""
284
285     # Apart from the prefix, all job IDs are numeric
286     self._re_job_id = re.compile(r"^%s\d+$" % prefix_pattern)
287
288   def OwnsJobId(self, job_id):
289     return self._re_job_id.match(job_id)
290
291   def FormatJobID(self, job_id):
292     if not isinstance(job_id, (int, long)):
293       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
294     if job_id < 0:
295       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
296
297     if self.id_prefix:
298       prefix = "%s-" % self.id_prefix
299     else:
300       prefix = ""
301
302     return "%s%010d" % (prefix, job_id)
303
304   def _ShouldJobBeArchivedUnlocked(self, job):
305     if job.GetStatus() not in (constants.JOB_STATUS_CANCELED,
306                                constants.JOB_STATUS_SUCCESS,
307                                constants.JOB_STATUS_ERROR):
308       logging.debug("Job %s is not yet done", job.id)
309       return False
310     return True
311
312
313 class DiskJobStorage(JobStorageBase):
314   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
315
316   def __init__(self, id_prefix):
317     JobStorageBase.__init__(self, id_prefix)
318
319     self._lock = threading.Lock()
320     self._memcache = {}
321     self._my_hostname = utils.HostInfo().name
322
323     # Make sure our directories exists
324     for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR):
325       try:
326         os.mkdir(path, 0700)
327       except OSError, err:
328         if err.errno not in (errno.EEXIST, ):
329           raise
330
331     # Get queue lock
332     self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
333     try:
334       utils.LockFile(self.lock_fd)
335     except:
336       self.lock_fd.close()
337       raise
338
339     # Read version
340     try:
341       version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
342     except IOError, err:
343       if err.errno not in (errno.ENOENT, ):
344         raise
345
346       # Setup a new queue
347       self._InitQueueUnlocked()
348
349       # Try to open again
350       version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
351
352     try:
353       # Try to read version
354       version = int(version_fd.read(128))
355
356       # Verify version
357       if version != constants.JOB_QUEUE_VERSION:
358         raise errors.JobQueueError("Found version %s, expected %s",
359                                    version, constants.JOB_QUEUE_VERSION)
360     finally:
361       version_fd.close()
362
363     self._last_serial = self._ReadSerial()
364     if self._last_serial is None:
365       raise errors.ConfigurationError("Can't read/parse the job queue serial"
366                                       " file")
367
368   @staticmethod
369   def _ReadSerial():
370     """Try to read the job serial file.
371
372     @rtype: None or int
373     @return: If the serial can be read, then it is returned. Otherwise None
374              is returned.
375
376     """
377     try:
378       serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
379       try:
380         # Read last serial
381         serial = int(serial_fd.read(1024).strip())
382       finally:
383         serial_fd.close()
384     except (ValueError, EnvironmentError):
385       serial = None
386
387     return serial
388
389   def Close(self):
390     assert self.lock_fd, "Queue should be open"
391
392     self.lock_fd.close()
393     self.lock_fd = None
394
395   def _InitQueueUnlocked(self):
396     assert self.lock_fd, "Queue should be open"
397
398     utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
399                     data="%s\n" % constants.JOB_QUEUE_VERSION)
400     if self._ReadSerial() is None:
401       utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
402                       data="%s\n" % 0)
403
404   def _NewSerialUnlocked(self, nodes):
405     """Generates a new job identifier.
406
407     Job identifiers are unique during the lifetime of a cluster.
408
409     Returns: A string representing the job identifier.
410
411     """
412     assert self.lock_fd, "Queue should be open"
413
414     # New number
415     serial = self._last_serial + 1
416
417     # Write to file
418     utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
419                     data="%s\n" % serial)
420
421     # Keep it only if we were able to write the file
422     self._last_serial = serial
423
424     # Distribute the serial to the other nodes
425     try:
426       nodes.remove(self._my_hostname)
427     except ValueError:
428       pass
429
430     result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE)
431     for node in nodes:
432       if not result[node]:
433         logging.error("copy of job queue file to node %s failed", node)
434
435     return self.FormatJobID(serial)
436
437   def _GetJobPath(self, job_id):
438     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
439
440   def _GetArchivedJobPath(self, job_id):
441     return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
442
443   def _ExtractJobID(self, name):
444     m = self._RE_JOB_FILE.match(name)
445     if m:
446       return m.group(1)
447     else:
448       return None
449
450   def _GetJobIDsUnlocked(self, archived=False):
451     """Return all known job IDs.
452
453     If the parameter archived is True, archived jobs IDs will be
454     included. Currently this argument is unused.
455
456     The method only looks at disk because it's a requirement that all
457     jobs are present on disk (so in the _memcache we don't have any
458     extra IDs).
459
460     """
461     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
462     jlist.sort()
463     return jlist
464
465   def _ListJobFiles(self):
466     assert self.lock_fd, "Queue should be open"
467
468     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
469             if self._RE_JOB_FILE.match(name)]
470
471   def _LoadJobUnlocked(self, job_id):
472     assert self.lock_fd, "Queue should be open"
473
474     if job_id in self._memcache:
475       logging.debug("Found job %s in memcache", job_id)
476       return self._memcache[job_id]
477
478     filepath = self._GetJobPath(job_id)
479     logging.debug("Loading job from %s", filepath)
480     try:
481       fd = open(filepath, "r")
482     except IOError, err:
483       if err.errno in (errno.ENOENT, ):
484         return None
485       raise
486     try:
487       data = serializer.LoadJson(fd.read())
488     finally:
489       fd.close()
490
491     job = _QueuedJob.Restore(self, data)
492     self._memcache[job_id] = job
493     logging.debug("Added job %s to the cache", job_id)
494     return job
495
496   def _GetJobsUnlocked(self, job_ids):
497     if not job_ids:
498       job_ids = self._GetJobIDsUnlocked()
499
500     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
501
502   @utils.LockedMethod
503   def GetJobs(self, job_ids):
504     return self._GetJobsUnlocked(job_ids)
505
506   @utils.LockedMethod
507   def AddJob(self, ops, nodes):
508     """Create and store on disk a new job.
509
510     @type ops: list
511     @param ops: The list of OpCodes that will become the new job.
512     @type nodes: list
513     @param nodes: The list of nodes to which the new job serial will be
514                   distributed.
515
516     """
517     assert self.lock_fd, "Queue should be open"
518
519     # Get job identifier
520     job_id = self._NewSerialUnlocked(nodes)
521     job = _QueuedJob(self, job_id, ops)
522
523     # Write to disk
524     self._UpdateJobUnlocked(job)
525
526     logging.debug("Added new job %s to the cache", job_id)
527     self._memcache[job_id] = job
528
529     return job
530
531   def _UpdateJobUnlocked(self, job):
532     assert self.lock_fd, "Queue should be open"
533
534     filename = self._GetJobPath(job.id)
535     logging.debug("Writing job %s to %s", job.id, filename)
536     utils.WriteFile(filename,
537                     data=serializer.DumpJson(job.Serialize(), indent=False))
538     self._CleanCacheUnlocked([job.id])
539
540   def _CleanCacheUnlocked(self, exclude):
541     """Clean the memory cache.
542
543     The exceptions argument contains job IDs that should not be
544     cleaned.
545
546     """
547     assert isinstance(exclude, list)
548     for job in self._memcache.values():
549       if job.id in exclude:
550         continue
551       if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,
552                                  constants.JOB_STATUS_RUNNING):
553         logging.debug("Cleaning job %s from the cache", job.id)
554         try:
555           del self._memcache[job.id]
556         except KeyError:
557           pass
558
559   @utils.LockedMethod
560   def UpdateJob(self, job):
561     return self._UpdateJobUnlocked(job)
562
563   # TODO: Figure out locking
564   #@utils.LockedMethod
565   def CancelJob(self, job_id):
566     """Cancels a job.
567
568     @type job_id: string
569     @param job_id: Job ID of job to be cancelled.
570
571     """
572     logging.debug("Cancelling job %s", job_id)
573
574     self._lock.acquire()
575     try:
576       job = self._LoadJobUnlocked(job_id)
577     finally:
578       self._lock.release()
579     if not job:
580       logging.debug("Job %s not found", job_id)
581       return
582
583     if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,):
584       logging.debug("Job %s is no longer in the queue", job.id)
585       return
586
587     job.SetCanceled("Job cancelled by request")
588
589   @utils.LockedMethod
590   def ArchiveJob(self, job_id):
591     """Archives a job.
592
593     @type job_id: string
594     @param job_id: Job ID of job to be archived.
595
596     """
597     logging.debug("Archiving job %s", job_id)
598
599     job = self._LoadJobUnlocked(job_id)
600     if not job:
601       logging.debug("Job %s not found", job_id)
602       return
603
604     if not self._ShouldJobBeArchivedUnlocked(job):
605       return
606
607     try:
608       old = self._GetJobPath(job.id)
609       new = self._GetArchivedJobPath(job.id)
610
611       os.rename(old, new)
612
613       logging.debug("Successfully archived job %s", job.id)
614     finally:
615       # Cleaning the cache because we don't know what os.rename actually did
616       # and to be on the safe side.
617       self._CleanCacheUnlocked([])
618
619
620 class JobQueue:
621   """The job queue.
622
623   """
624   def __init__(self, context):
625     self._lock = threading.Lock()
626     self._jobs = DiskJobStorage("")
627     self._wpool = _JobQueueWorkerPool(context)
628
629     for job in self._jobs.GetJobs(None):
630       status = job.GetStatus()
631       if status in (constants.JOB_STATUS_QUEUED, ):
632         self._wpool.AddTask(job)
633
634       elif status in (constants.JOB_STATUS_RUNNING, ):
635         logging.warning("Unfinished job %s found: %s", job.id, job)
636         job.SetUnclean("Unclean master daemon shutdown")
637
638   @utils.LockedMethod
639   def SubmitJob(self, ops, nodes):
640     """Add a new job to the queue.
641
642     This enters the job into our job queue and also puts it on the new
643     queue, in order for it to be picked up by the queue processors.
644
645     @type ops: list
646     @param ops: the sequence of opcodes that will become the new job
647     @type nodes: list
648     @param nodes: the list of nodes to which the queue should be
649                   distributed
650
651     """
652     job = self._jobs.AddJob(ops, nodes)
653
654     # Add to worker pool
655     self._wpool.AddTask(job)
656
657     return job.id
658
659   def ArchiveJob(self, job_id):
660     self._jobs.ArchiveJob(job_id)
661
662   @utils.LockedMethod
663   def CancelJob(self, job_id):
664     self._jobs.CancelJob(job_id)
665
666   def _GetJobInfo(self, job, fields):
667     row = []
668     for fname in fields:
669       if fname == "id":
670         row.append(job.id)
671       elif fname == "status":
672         row.append(job.GetStatus())
673       elif fname == "ops":
674         row.append([op.GetInput().__getstate__() for op in job._ops])
675       elif fname == "opresult":
676         row.append([op.GetResult() for op in job._ops])
677       elif fname == "opstatus":
678         row.append([op.GetStatus() for op in job._ops])
679       elif fname == "ticker":
680         ji = job.GetRunOpIndex()
681         if ji < 0:
682           lmsg = None
683         else:
684           lmsg = job._ops[ji].RetrieveLog(-1)
685           # message might be empty here
686           if lmsg:
687             lmsg = lmsg[0]
688           else:
689             lmsg = None
690         row.append(lmsg)
691       else:
692         raise errors.OpExecError("Invalid job query field '%s'" % fname)
693     return row
694
695   def QueryJobs(self, job_ids, fields):
696     """Returns a list of jobs in queue.
697
698     Args:
699     - job_ids: Sequence of job identifiers or None for all
700     - fields: Names of fields to return
701
702     """
703     self._lock.acquire()
704     try:
705       jobs = []
706
707       for job in self._jobs.GetJobs(job_ids):
708         if job is None:
709           jobs.append(None)
710         else:
711           jobs.append(self._GetJobInfo(job, fields))
712
713       return jobs
714     finally:
715       self._lock.release()
716
717   @utils.LockedMethod
718   def Shutdown(self):
719     """Stops the job queue.
720
721     """
722     self._wpool.TerminateWorkers()
723     self._jobs.Close()