add stdout, stderr redirection of peers to logfile
[archipelago] / xseg / peers / user / mt-pfiled.c
1 /*
2  * The Pithos File Blocker Peer (pfiled)
3  */
4
5 #define _GNU_SOURCE
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <unistd.h>
11 #include <string.h>
12 #include <fcntl.h>
13 #include <errno.h>
14 #include <aio.h>
15 #include <signal.h>
16 #include <limits.h>
17 #include <pthread.h>
18 #include <syscall.h>
19 #include <sys/sendfile.h>
20 #include <peer.h>
21
22 #include <xseg/xseg.h>
23 #include <xseg/protocol.h>
24
25 #define LOCK_SUFFIX             "_lock"
26 #define MAX_PATH_SIZE           1024
27 #define MAX_FILENAME_SIZE       (XSEG_MAX_TARGETLEN + 5) //strlen(LOCK_SUFFIX)
28
29 /* default concurrency level (number of threads) */
30 #define DEFAULT_NR_OPS           16
31
32 /* Pithos hash for the zero block
33  * FIXME: Should it be hardcoded?
34  */
35 #define ZERO_BLOCK \
36         "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b85"
37
38 /*
39  * Globals, holding command-line arguments
40  */
41
42 void usage(char *argv0)
43 {
44         fprintf(stderr,
45                         "Usage: %s <PATH> <VPATH> [-p PORT] [-g XSEG_SPEC] [-n NR_OPS] [-v]\n\n"
46                         "where:\n"
47                         "\tPATH: path to pithos data blocks\n"
48                         "\tVPATH: path to modified volume blocks\n"
49                         "\tPORT: xseg port to listen for requests on\n"
50                         "\tXSEG_SPEC: xseg spec as 'type:name:nr_ports:nr_requests:"
51                         "request_size:extra_size:page_shift'\n"
52                         "\tNR_OPS: number of outstanding xseg requests\n"
53                         "\t-v: verbose mode\n",
54                         argv0);
55
56 }
57
58 /* fdcache_node flags */
59 #define READY (1 << 1)
60
61 /* fdcache node info */
62 struct fdcache_node {
63         volatile int fd;
64         volatile unsigned int ref;
65         volatile unsigned long time;
66         volatile unsigned int flags;
67         pthread_cond_t cond;
68         char target[MAX_FILENAME_SIZE + 1];
69 };
70
71 /* pfiled context */
72 struct pfiled {
73         uint32_t path_len;
74         uint32_t vpath_len;
75         uint64_t handled_reqs;
76         long maxfds;
77         struct fdcache_node *fdcache;
78         pthread_mutex_t cache_lock;
79         char path[MAX_PATH_SIZE + 1];
80         char vpath[MAX_PATH_SIZE + 1];
81 };
82
83 /*
84  * pfiled specific structure 
85  * containing information on a pending I/O operation
86  */
87 struct fio {
88         uint32_t state;
89         long fdcacheidx;
90 };
91
92 struct pfiled * __get_pfiled(struct peerd *peer)
93 {
94         return (struct pfiled *) peer->priv;
95 }
96
97 struct fio * __get_fio(struct peer_req *pr)
98 {
99         return (struct fio*) pr->priv;
100 }
101
102 static void close_cache_entry(struct peerd *peer, struct peer_req *pr)
103 {
104         struct pfiled *pfiled = __get_pfiled(peer);
105         struct fio *fio = __get_fio(pr);
106         int fd = -1;
107         if (fio->fdcacheidx >= 0) {
108                 if (!__sync_sub_and_fetch(&pfiled->fdcache[fio->fdcacheidx].ref, 1) && !(pfiled->fdcache[fio->fdcacheidx].flags & READY)) {
109                         pthread_mutex_lock(&pfiled->cache_lock);
110                         if (!pfiled->fdcache[fio->fdcacheidx].ref){
111                                 /* invalidate cache entry */
112                                 fd = pfiled->fdcache[fio->fdcacheidx].fd;
113                                 pfiled->fdcache[fio->fdcacheidx].fd = -1;
114                                 pfiled->fdcache[fio->fdcacheidx].target[0] = 0;
115                                 pfiled->fdcache[fio->fdcacheidx].flags |= READY;
116                         }
117                         pthread_mutex_unlock(&pfiled->cache_lock);
118                         if (fd > 0)
119                                 close(fd);
120
121                 }
122         }
123 }
124
125 static void pfiled_complete(struct peerd *peer, struct peer_req *pr)
126 {
127         close_cache_entry(peer, pr);
128         complete(peer, pr);
129 }
130
131 static void pfiled_fail(struct peerd *peer, struct peer_req *pr)
132 {
133         close_cache_entry(peer, pr);
134         fail(peer, pr);
135 }
136
137 static void handle_unknown(struct peerd *peer, struct peer_req *pr)
138 {
139         XSEGLOG2(&lc, W, "unknown request op");
140         pfiled_fail(peer, pr);
141 }
142
143 static int create_path(char *buf, char *path, char *target, uint32_t targetlen, int mkdirs)
144 {
145         int i;
146         struct stat st;
147         uint32_t pathlen = strlen(path);
148
149         strncpy(buf, path, pathlen);
150
151         for (i = 0; i < 9; i+= 3) {
152                 buf[pathlen + i] = target[i - (i/3)];
153                 buf[pathlen + i +1] = target[i + 1 - (i/3)];
154                 buf[pathlen + i + 2] = '/';
155                 if (mkdirs == 1) {
156                         buf[pathlen + i + 3] = '\0';
157 retry:
158                         if (stat(buf, &st) < 0) 
159                                 if (mkdir(buf, 0700) < 0) {
160                                         if (errno == EEXIST)
161                                                 goto retry;
162                                         perror(buf);
163                                         return errno;
164                                 }
165                 }
166         }
167
168         strncpy(&buf[pathlen + 9], target, targetlen);
169         buf[pathlen + 9 + targetlen] = '\0';
170
171         return 0;
172 }
173
174 static int dir_open(struct pfiled *pfiled, struct fio *io,
175                 char *target, uint32_t targetlen, int mode)
176 {
177         int fd = -1;
178         struct fdcache_node *ce = NULL;
179         long i, lru;
180         char tmp[pfiled->path_len + targetlen + 10];
181         uint64_t min;
182         io->fdcacheidx = -1;
183         if (targetlen> MAX_FILENAME_SIZE)
184                 goto out_err;
185
186 start:
187         /* check cache */
188         pthread_mutex_lock(&pfiled->cache_lock);
189 start_locked:
190         lru = -1;
191         min = UINT64_MAX;
192         for (i = 0; i < pfiled->maxfds; i++) {
193                 if (pfiled->fdcache[i].ref == 0 && min > pfiled->fdcache[i].time 
194                                 && (pfiled->fdcache[i].flags & READY)) {
195                         min = pfiled->fdcache[i].time;
196                         lru = i;
197
198                 }
199
200                 if (!strncmp(pfiled->fdcache[i].target, target, targetlen)) {
201                         if (pfiled->fdcache[i].target[targetlen] == 0) {
202                                 ce = &pfiled->fdcache[i];
203                                 /* if any other io thread is currently opening
204                                  * the file, block until it succeeds or fails
205                                  */
206                                 if (!(ce->flags & READY)) {
207                                         pthread_cond_wait(&ce->cond, &pfiled->cache_lock);
208                                         /* when ready, restart lookup */
209                                         goto start_locked;
210                                 }
211                                 /* if successfully opened */
212                                 if (ce->fd > 0) {
213                                         fd = pfiled->fdcache[i].fd;
214                                         io->fdcacheidx = i;
215                                         goto out;
216                                 }
217                                 /* else open failed for the other io thread, so
218                                  * it should fail for everyone waiting on this
219                                  * file.
220                                  */
221                                 else {
222                                         fd = -1;
223                                         io->fdcacheidx = -1;
224                                         goto out_err_unlock;
225                                 }
226                         }
227                 }
228         }
229         if (lru < 0){
230                 /* all cache entries are currently being used */
231                 pthread_mutex_unlock(&pfiled->cache_lock);
232                 goto start;
233         }
234         if (pfiled->fdcache[lru].ref){
235                 fd = -1;
236                 XSEGLOG2(&lc, E, "lru(%ld) ref not 0 (%u)\n", lru, pfiled->fdcache[lru].ref);
237                 goto out_err_unlock;
238         }
239         /* make room for new file */
240         ce = &pfiled->fdcache[lru];
241         /* set name here and state to not ready, for any other requests on the
242          * same target that may follow
243          */
244         strncpy(ce->target, target, targetlen);
245         ce->target[targetlen] = 0;
246         ce->flags &= ~READY;
247         pthread_mutex_unlock(&pfiled->cache_lock);
248
249         if (ce->fd >0){
250                 if (close(ce->fd) < 0){
251                         XSEGLOG2(&lc, W, "Cannot close %s", ce->target);
252                 }
253         }
254
255         /* try opening it from pithos blocker dir */
256         if (create_path(tmp, pfiled->path, target, targetlen, 0) < 0) {
257                 fd = -1;
258                 goto new_entry;
259         }
260
261         fd = open(tmp, O_RDWR);
262         if (fd < 0) {
263                 /* try opening it from the tmp dir */
264                 if (create_path(tmp, pfiled->vpath, target, targetlen, 0) < 0)
265                         goto new_entry;
266
267                 fd = open(tmp, O_RDWR);
268                 if (fd < 0)  {
269                         if (create_path(tmp, pfiled->vpath, target, targetlen, 1) < 0) {
270                                 fd = -1;
271                                 goto new_entry;
272                         }
273
274                         fd = open(tmp, O_RDWR | O_CREAT, 0600);         
275                         if (fd < 0)
276                                 XSEGLOG2(&lc, E, "Cannot open %s", tmp);
277                 }
278         }
279
280         /* insert in cache a negative fd to indicate opening error to
281          * any other ios waiting for the file to open
282          */
283
284         /* insert in cache */
285 new_entry:
286         pthread_mutex_lock(&pfiled->cache_lock);
287         ce->fd = fd;
288         ce->ref = 0;
289         ce->flags = READY;
290         pthread_cond_broadcast(&ce->cond);
291         if (fd > 0) {
292                 io->fdcacheidx = lru;
293         }
294         else {
295                 io->fdcacheidx = -1;
296                 goto out_err_unlock;
297         }
298
299 out:
300         pfiled->handled_reqs++;
301         ce->time = pfiled->handled_reqs;
302         __sync_fetch_and_add(&ce->ref, 1);
303         pthread_mutex_unlock(&pfiled->cache_lock);
304 out_err:
305         return fd;
306
307 out_err_unlock:
308         pthread_mutex_unlock(&pfiled->cache_lock);
309         goto out_err;
310 }
311
312 static void handle_read_write(struct peerd *peer, struct peer_req *pr)
313 {
314         struct pfiled *pfiled = __get_pfiled(peer);
315         struct fio *fio = __get_fio(pr);
316         struct xseg_request *req = pr->req;
317         int r, fd;
318         char *target = xseg_get_target(peer->xseg, req);
319         char *data = xseg_get_data(peer->xseg, req);
320
321         fd = dir_open(pfiled, fio, target, req->targetlen, 0);
322         if (fd < 0){
323                 XSEGLOG2(&lc, E, "Dir open failed");
324                 pfiled_fail(peer, pr);
325                 return;
326         }
327
328         if (!req->size) {
329                 if (req->flags & (XF_FLUSH | XF_FUA)) {
330                         /* No FLUSH/FUA support yet (O_SYNC ?).
331                          * note that with FLUSH/size == 0 
332                          * there will probably be a (uint64_t)-1 offset */
333                         pfiled_complete(peer, pr);
334                         return;
335                 } else {
336                         pfiled_complete(peer, pr);
337                         return;
338                 }
339         }
340
341         switch (req->op) {
342                 case X_READ:
343                         while (req->serviced < req->datalen) {
344                                 r = pread(fd, data + req->serviced, 
345                                                 req->datalen - req->serviced,
346                                                 req->offset + req->serviced);
347                                 if (r < 0) {
348                                         req->datalen = req->serviced;
349                                         XSEGLOG2(&lc, E, "Cannot read");
350                                 }
351                                 else if (r == 0) {
352                                         /* reached end of file. zero out the rest data buffer */
353                                         memset(data + req->serviced, 0, req->datalen - req->serviced);
354                                         req->serviced = req->datalen;
355                                 }
356                                 else {
357                                         req->serviced += r;
358                                 }
359                         }
360                         break;
361                 case X_WRITE:
362                         while (req->serviced < req->datalen) {
363                                 r = pwrite(fd, data + req->serviced, 
364                                                 req->datalen - req->serviced,
365                                                 req->offset + req->serviced);
366                                 if (r < 0) {
367                                         req->datalen = req->serviced;
368                                 }
369                                 else {
370                                         req->serviced += r;
371                                 }
372                         }
373                         r = fsync(fd);
374                         if (r< 0) {
375                                 XSEGLOG2(&lc, E, "Fsync failed.");
376                                 /* if fsync fails, then no bytes serviced correctly */
377                                 req->serviced = 0;
378                         }
379                         break;
380                 default:
381                         XSEGLOG2(&lc, E, "wtf, corrupt op %u?\n", req->op);
382                         pfiled_fail(peer, pr);
383                         return;
384         }
385
386         if (req->serviced > 0 ) {
387                 pfiled_complete(peer, pr);
388         }
389         else {
390                 pfiled_fail(peer, pr);
391         }
392         return;
393 }
394
395 static void handle_info(struct peerd *peer, struct peer_req *pr)
396 {
397         struct pfiled *pfiled = __get_pfiled(peer);
398         struct fio *fio = __get_fio(pr);
399         struct xseg_request *req = pr->req;
400         struct stat stat;
401         int fd, r;
402         uint64_t size;
403         char *target = xseg_get_target(peer->xseg, req);
404         char *data = xseg_get_data(peer->xseg, req);
405         struct xseg_reply_info *xinfo  = (struct xseg_reply_info *)data;
406
407         fd = dir_open(pfiled, fio, target, req->targetlen, 0);
408         if (fd < 0) {
409                 XSEGLOG2(&lc, E, "Dir open failed");
410                 pfiled_fail(peer, pr);
411                 return;
412         }
413
414         r = fstat(fd, &stat);
415         if (r < 0) {
416                 XSEGLOG2(&lc, E, "fail in stat");
417                 pfiled_fail(peer, pr);
418                 return;
419         }
420
421         size = (uint64_t)stat.st_size;
422         xinfo->size = size;
423
424         pfiled_complete(peer, pr);
425 }
426
427 static void handle_copy(struct peerd *peer, struct peer_req *pr)
428 {
429         struct pfiled *pfiled = __get_pfiled(peer);
430         struct fio *fio = __get_fio(pr);
431         struct xseg_request *req = pr->req;
432         
433         char *target = xseg_get_target(peer->xseg, req);
434         char *data = xseg_get_data(peer->xseg, req);
435         struct xseg_request_copy *xcopy = (struct xseg_request_copy *)data;
436         struct stat st;
437         char *buf = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
438         int n, src = -1, dst = -1, r = -1;
439
440         if (!buf){
441                 XSEGLOG2(&lc, E, "Out of memory");
442                 pfiled_fail(peer, pr);
443                 return;
444         }
445
446         dst = dir_open(pfiled, fio, target, req->targetlen, 1);
447         if (dst < 0) {
448                 XSEGLOG2(&lc, E, "Fail in dst");
449                 r = dst;
450                 goto out;
451         }
452
453         if (create_path(buf, pfiled->path, xcopy->target, xcopy->targetlen, 0) < 0)  {
454                 XSEGLOG2(&lc, E, "Create path failed");
455                 r = -1;
456                 goto out;
457         }
458
459         src = open(buf, O_RDWR);
460         if (src < 0) {
461                 XSEGLOG2(&lc, E, "fail in src %s", buf);
462                 r = src;
463                 goto out;
464         }
465
466         r = fstat(src, &st);
467         if (r < 0){
468                 XSEGLOG2(&lc, E, "fail in stat for src %s", buf);
469                 goto out;
470         }
471
472         n = sendfile(dst, src, 0, st.st_size);
473         if (n != st.st_size) {
474                 XSEGLOG2(&lc, E, "Copy failed for %s", buf);
475                 r = -1;
476                 goto out;
477         }
478         r = 0;
479
480 out:
481         if (src > 0)
482                 close(src);
483         free(buf);
484         if (r < 0)
485                 pfiled_fail(peer, pr);
486         else
487                 pfiled_complete(peer, pr);
488         return;
489 }
490
491 static void handle_delete(struct peerd *peer, struct peer_req *pr)
492 {
493         struct pfiled *pfiled = __get_pfiled(peer);
494         struct fio *fio = __get_fio(pr);
495         struct xseg_request *req = pr->req;
496         
497         char *buf = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
498         int fd, r;
499         char *target = xseg_get_target(peer->xseg, req);
500         if (!buf){
501                 XSEGLOG2(&lc, E, "Out of memory");
502                 pfiled_fail(peer, pr);
503                 return;
504         }
505         fd = dir_open(pfiled, fio, target, req->targetlen, 0);
506         if (fd < 0) {
507                 XSEGLOG2(&lc, E, "Dir open failed");
508                 r = fd;
509                 goto out;
510         }
511
512         /* mark cache entry as invalid 
513          * give a chance to pending operations on this file to end.
514          * file will close when all operations are done 
515          */
516         if (fio->fdcacheidx >= 0) {
517                 pthread_mutex_lock(&pfiled->cache_lock);
518                 pfiled->fdcache[fio->fdcacheidx].flags &= ~READY;
519                 pthread_mutex_unlock(&pfiled->cache_lock);
520         }
521
522         r = create_path(buf, pfiled->vpath, target, req->targetlen, 0);
523         if (r< 0) {
524                 XSEGLOG2(&lc, E, "Create path failed");
525                 goto out;
526         }
527         r = unlink(buf);
528 out:
529         free(buf);
530         if (r < 0)
531                 pfiled_fail(peer, pr);
532         else
533                 pfiled_complete(peer, pr);
534         return;
535 }
536
537 static void handle_open(struct peerd *peer, struct peer_req *pr)
538 {
539         struct pfiled *pfiled = __get_pfiled(peer);
540 //      struct fio *fio = __get_fio(pr);
541         struct xseg_request *req = pr->req;
542         char *buf = malloc(MAX_FILENAME_SIZE);
543         char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
544         int fd = -1;
545         char *target = xseg_get_target(peer->xseg, req);
546
547         if (!buf || !pathname) {
548                 XSEGLOG2(&lc, E, "Out of memory");
549                 pfiled_fail(peer, pr);
550                 return;
551         }
552
553         strncpy(buf, target, req->targetlen);
554         strncpy(buf+req->targetlen, LOCK_SUFFIX, strlen(LOCK_SUFFIX));
555
556         if (create_path(pathname, pfiled->vpath, buf, req->targetlen + strlen(LOCK_SUFFIX), 1) < 0) {
557                 XSEGLOG2(&lc, E, "Create path failed for %s", buf);
558                 goto out;
559         }
560
561         //nfs v >= 3
562         fd = open(pathname, O_CREAT | O_EXCL, S_IRWXU | S_IRUSR);
563         if (fd < 0){
564                 //actual error
565                 if (errno != -EEXIST)
566                         XSEGLOG2(&lc, W, "Error opening %s", pathname);
567                 goto out;
568         }
569         close(fd);
570 out:
571         free(buf);
572         free(pathname);
573         if (fd < 0)
574                 pfiled_fail(peer, pr);
575         else
576                 pfiled_complete(peer, pr);
577         return;
578 }
579
580 static void handle_close(struct peerd *peer, struct peer_req *pr)
581 {
582         struct pfiled *pfiled = __get_pfiled(peer);
583 //      struct fio *fio = __get_fio(pr);
584         struct xseg_request *req = pr->req;
585         char *buf = malloc(MAX_FILENAME_SIZE);
586         char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
587         char *target = xseg_get_target(peer->xseg, req);
588         int r;
589
590         if (!buf || !pathname) {
591                 XSEGLOG2(&lc, E, "Out of memory");
592                 fail(peer, pr);
593                 return;
594         }
595
596         strncpy(buf, target, req->targetlen);
597         strncpy(buf+req->targetlen, LOCK_SUFFIX, strlen(LOCK_SUFFIX));
598
599         r = create_path(pathname, pfiled->vpath, buf, req->targetlen + strlen(LOCK_SUFFIX), 0);
600         if (r < 0) {
601                 XSEGLOG2(&lc, E, "Create path failed for %s", buf);
602                 goto out;
603         }
604         r = unlink(pathname);
605
606 out:
607         free(buf);
608         free(pathname);
609         if (r < 0)
610                 fail(peer, pr);
611         else
612                 complete(peer, pr);
613         return;
614 }
615
616 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
617                                 enum dispatch_reason reason)
618 {
619         struct fio *fio = __get_fio(pr);
620         if (reason == dispatch_accept){
621                 fio->fdcacheidx = -1;
622                 fio->state = XS_ACCEPTED;
623         }
624         
625         switch (req->op) {
626                 case X_READ:
627                 case X_WRITE:
628                         handle_read_write(peer, pr); break;
629                 case X_INFO:
630                         handle_info(peer, pr); break;
631                 case X_COPY:
632                         handle_copy(peer, pr); break;
633                 case X_DELETE:
634                         handle_delete(peer, pr); break;
635                 case X_OPEN:
636                         handle_open(peer, pr); break;
637                 case X_CLOSE:
638                         handle_close(peer, pr); break;
639                         //      case X_SNAPSHOT:
640                 case X_SYNC:
641                 default:
642                         handle_unknown(peer, pr);
643         }
644         return 0;
645 }
646
647 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
648 {
649         int ret = 0;
650         int i;
651         struct pfiled *pfiled = malloc(sizeof(struct pfiled));
652         if (!pfiled){
653                 XSEGLOG2(&lc, E, "Out of memory");
654                 ret = -ENOMEM;
655                 goto out;
656         }
657         peer->priv = pfiled;
658
659         pfiled->maxfds = 2 * peer->nr_ops;
660         pfiled->fdcache = calloc(pfiled->maxfds, sizeof(struct fdcache_node));
661         if(!pfiled->fdcache) {
662                 XSEGLOG2(&lc, E, "Out of memory");
663                 ret = -ENOMEM;
664                 goto out;
665         }
666
667         for (i = 0; i < peer->nr_ops; i++) {
668                 peer->peer_reqs[i].priv = malloc(sizeof(struct fio));
669                 if (!peer->peer_reqs->priv){
670                         XSEGLOG2(&lc, E, "Out of memory");
671                         ret = -ENOMEM;
672                         goto out;
673                 }
674         }
675
676         pfiled->handled_reqs = 0;
677         for (i = 0; i < argc; i++) {
678                 if (!strcmp(argv[i], "--path") && (i+1) < argc){
679                         strncpy(pfiled->path, argv[i+1], MAX_PATH_SIZE);
680                         pfiled->path[MAX_PATH_SIZE] = 0;
681                         i += 1;
682                         continue;
683                 }
684                 if (!strcmp(argv[i], "--vpath") && (i+1) < argc){
685                         strncpy(pfiled->vpath, argv[i+1], MAX_PATH_SIZE);
686                         pfiled->vpath[MAX_PATH_SIZE] = 0;
687                         i += 1;
688                         continue;
689                 }
690         }
691
692         pfiled->path_len = strlen(pfiled->path);
693         if (pfiled->path[pfiled->path_len -1] != '/'){
694                 pfiled->path[pfiled->path_len] = '/';
695                 pfiled->path[++pfiled->path_len]= 0;
696         }
697
698         pfiled->vpath_len = strlen(pfiled->vpath);
699         if (pfiled->vpath[pfiled->vpath_len -1] != '/'){
700                 pfiled->vpath[pfiled->vpath_len] = '/';
701                 pfiled->vpath[++pfiled->vpath_len]= 0;
702         }
703
704         for (i = 0; i < peer->nr_ops; i++) {
705                 pthread_cond_init(&pfiled->fdcache[i].cond, NULL);
706                 pfiled->fdcache[i].flags = READY;
707         }
708         pthread_mutex_init(&pfiled->cache_lock, NULL);
709
710 out:
711         return ret;
712 }
713
714 static int safe_atoi(char *s)
715 {
716         long l;
717         char *endp;
718
719         l = strtol(s, &endp, 10);
720         if (s != endp && *endp == '\0')
721                 return l;
722         else
723                 return -1;
724 }