Improve pthread signaling.
[archipelago] / xseg / drivers / user / xseg_pthread.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #define _GNU_SOURCE
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <unistd.h>
39 #include <sys/types.h>
40 #include <sys/stat.h>
41 #include <sys/mman.h>
42 #include <sys/syscall.h>
43 #include <fcntl.h>
44 #include <errno.h>
45 #include <string.h>
46 #include <signal.h>
47 #include <sys/util.h>
48 #include <xseg/xseg.h>
49 #include <pthread.h>
50 #include <drivers/xseg_pthread.h>
51 #define ERRSIZE 512
52 char errbuf[ERRSIZE];
53
54 static void *pthread_malloc(uint64_t size);
55 static void pthread_mfree(void *mem);
56
57 static long pthread_allocate(const char *name, uint64_t size)
58 {
59         int fd, r;
60         fd = shm_open(name, O_RDWR | O_CREAT | O_EXCL, 0770);
61         if (fd < 0) {
62                 XSEGLOG("Cannot create shared segment: %s\n",
63                         strerror_r(errno, errbuf, ERRSIZE));
64                 return fd;
65         }
66
67         r = lseek(fd, size -1, SEEK_SET);
68         if (r < 0) {
69                 close(fd);
70                 XSEGLOG("Cannot seek into segment file: %s\n",
71                         strerror_r(errno, errbuf, ERRSIZE));
72                 return r;
73         }
74
75         errbuf[0] = 0;
76         r = write(fd, errbuf, 1);
77         if (r != 1) {
78                 close(fd);
79                 XSEGLOG("Failed to set segment size: %s\n",
80                         strerror_r(errno, errbuf, ERRSIZE));
81                 return r;
82         }
83
84         close(fd);
85         return 0;
86 }
87
88 static long pthread_deallocate(const char *name)
89 {
90         return shm_unlink(name);
91 }
92
93 static void *pthread_map(const char *name, uint64_t size, struct xseg *seg)
94 {
95         struct xseg *xseg;
96         int fd;
97
98         if (seg)
99                 XSEGLOG("struct xseg * is not NULL. Ignoring...\n");
100
101         fd = shm_open(name, O_RDWR, 0000);
102         if (fd < 0) {
103                 XSEGLOG("Failed to open '%s' for mapping: %s\n",
104                         name, strerror_r(errno, errbuf, ERRSIZE));
105                 return NULL;
106         }
107
108         xseg = mmap (   XSEG_BASE_AS_PTR,
109                         size,
110                         PROT_READ | PROT_WRITE,
111                         MAP_SHARED | MAP_FIXED /* | MAP_LOCKED */,
112                         fd, 0   );
113
114         if (xseg == MAP_FAILED) {
115                 XSEGLOG("Could not map segment: %s\n",
116                         strerror_r(errno, errbuf, ERRSIZE));
117                 return NULL;
118         }
119
120         close(fd);
121         return xseg;
122 }
123
124 static void pthread_unmap(void *ptr, uint64_t size)
125 {
126         struct xseg *xseg = ptr;
127         (void)munmap(xseg, xseg->segment_size);
128 }
129
130
131 static void handler(int signum)
132 {
133         static unsigned long counter;
134         printf("%lu: signal %d: this shouldn't have happened.\n", counter, signum);
135         counter ++;
136 }
137
138 static pthread_key_t pid_key, xpidx_key;
139 static pthread_key_t mask_key, act_key;
140 static pthread_key_t id_key;
141 static pthread_once_t once_init = PTHREAD_ONCE_INIT;
142 static pthread_once_t once_quit = PTHREAD_ONCE_INIT;
143 static int isInit;
144 static volatile int id = 0;
145
146 static void keys_init(void)
147 {
148         int r;
149
150         r = pthread_key_create(&pid_key, NULL);
151         if (r < 0) {
152                 isInit = 0;
153                 return;
154         }
155
156         r = pthread_key_create(&xpidx_key, NULL);
157         if (r < 0) {
158                 isInit = 0;
159                 return;
160         }
161         r = pthread_key_create(&mask_key, NULL);
162         if (r < 0) {
163                 isInit = 0;
164                 return;
165         }
166
167         r = pthread_key_create(&act_key, NULL);
168         if (r < 0) {
169                 isInit = 0;
170                 return;
171         }
172         r = pthread_key_create(&id_key, NULL);
173         if (r < 0) {
174                 isInit = 0;
175                 return;
176         }
177         isInit = 1;
178         once_quit = PTHREAD_ONCE_INIT;
179 }
180
181 #define INT_TO_POINTER(__myptr, __myint) \
182         do {\
183                 unsigned long __foo____myptr = (unsigned long) __myint; \
184                 __myptr = (void *) __foo____myptr ; \
185         } while (0)
186
187 #define POINTER_TO_INT(__myint, __myptr)\
188         do { \
189                 unsigned long __foo____myint = (unsigned long) __myptr; \
190                 __myint = (int) __foo____myint ; \
191         } while (0)
192
193 /* must be called by each thread */
194 static int pthread_local_signal_init(struct xseg *xseg, xport portno)
195 {
196         int r, my_id;
197         pid_t pid;
198         void *tmp, *tmp2;
199         sigset_t *savedset, *set;
200         struct sigaction *act, *old_act;
201
202         savedset = pthread_malloc(sizeof(sigset_t));
203         if (!savedset)
204                 goto err1;
205         set = pthread_malloc(sizeof(sigset_t));
206         if (!set)
207                 goto err2;
208
209         act = pthread_malloc(sizeof(struct sigaction));
210         if (!act)
211                 goto err3;
212         old_act = pthread_malloc(sizeof(struct sigaction));
213         if (!old_act)
214                 goto err4;
215
216         pthread_once(&once_init, keys_init);
217         if (!isInit)
218                 goto err5;
219
220         sigemptyset(set);
221         act->sa_handler = handler;
222         act->sa_mask = *set;
223         act->sa_flags = 0;
224         if(sigaction(SIGIO, act, old_act) < 0)
225                 goto err5;
226
227         
228         sigaddset(set, SIGIO);
229
230         r = pthread_sigmask(SIG_BLOCK, set, savedset);
231         if (r < 0) 
232                 goto err6;
233
234
235         my_id = *(volatile int *) &id;
236         while (!__sync_bool_compare_and_swap(&id, my_id, my_id+1)){
237                 my_id = *(volatile int *) &id;
238         }
239         pid = syscall(SYS_gettid);
240         INT_TO_POINTER(tmp, pid);
241         INT_TO_POINTER(tmp2, my_id);
242         if (pthread_setspecific(pid_key, tmp) ||
243                         pthread_setspecific(mask_key, savedset) ||
244                         pthread_setspecific(act_key, old_act) ||
245                         pthread_setspecific(id_key, tmp2))
246                 goto err7;
247
248         return 0;
249
250 err7:
251         pthread_sigmask(SIG_BLOCK, savedset, NULL);
252 err6:
253         sigaction(SIGIO, old_act, NULL);
254 err5:
255         pthread_mfree(old_act);
256 err4:
257         pthread_mfree(act);
258 err3:
259         pthread_mfree(set);
260 err2:
261         pthread_mfree(savedset);
262 err1:
263         return -1;
264 }
265
266 /* should be called by each thread which had initialized signals */
267 static void pthread_local_signal_quit(struct xseg *xseg, xport portno)
268 {
269         sigset_t *savedset;
270         struct sigaction *old_act;
271
272         savedset = pthread_getspecific(act_key);
273         old_act = pthread_getspecific(mask_key);
274         if (old_act)
275                 sigaction(SIGIO, old_act, NULL);
276         if (savedset)
277                 pthread_sigmask(SIG_SETMASK, savedset, NULL);
278 }
279
280 static int pthread_remote_signal_init(void)
281 {
282         return 0;
283 }
284
285 static void pthread_remote_signal_quit(void)
286 {
287         return;
288 }
289
290 static int pthread_prepare_wait(struct xseg *xseg, uint32_t portno)
291 {
292         void * tmp;
293         pid_t pid;
294         int my_id;
295         struct xseg_port *port = xseg_get_port(xseg, portno);
296         if (!port) 
297                 return -1;
298         struct pthread_signal_desc *psd = xseg_get_signal_desc(xseg, port);
299         if (!psd)
300                 return -1;
301
302         tmp = pthread_getspecific(pid_key);
303         POINTER_TO_INT(pid, tmp);
304         if (!pid)
305                 return -1;
306         tmp = pthread_getspecific(id_key);
307         POINTER_TO_INT(my_id, tmp);
308         psd->pids[my_id] = pid;
309         return 0;
310 }
311
312 static int pthread_cancel_wait(struct xseg *xseg, uint32_t portno)
313 {
314         void * tmp;
315         int my_id;
316         pid_t pid;
317         struct xseg_port *port = xseg_get_port(xseg, portno);
318         if (!port)
319                 return -1;
320         struct pthread_signal_desc *psd = xseg_get_signal_desc(xseg, port);
321         if (!psd)
322                 return -1;
323
324         tmp = pthread_getspecific(pid_key);
325         POINTER_TO_INT(pid, tmp);
326         if (!pid)
327                 return -1;
328
329         tmp = pthread_getspecific(id_key);
330         POINTER_TO_INT(my_id, tmp);
331         psd->pids[my_id] = 0;
332
333         return 0;
334 }
335
336 static int pthread_wait_signal(struct xseg *xseg, uint32_t usec_timeout)
337 {
338         int r;
339         siginfo_t siginfo;
340         struct timespec ts;
341         sigset_t set;
342         sigemptyset(&set);
343         sigaddset(&set, SIGIO);
344
345         ts.tv_sec = usec_timeout / 1000000;
346         ts.tv_nsec = 1000 * (usec_timeout - ts.tv_sec * 1000000);
347
348         r = sigtimedwait(&set, &siginfo, &ts);
349         if (r < 0)
350                 return r;
351
352         return siginfo.si_signo;
353 }
354
355 static int pthread_signal(struct xseg *xseg, uint32_t portno)
356 {
357         int i;
358
359         struct xseg_port *port = xseg_get_port(xseg, portno);
360         if (!port) 
361                 return -1;
362         struct pthread_signal_desc *psd = xseg_get_signal_desc(xseg, port);
363         if (!psd)
364                 return -1;
365
366         pid_t cue;
367         for (i = 0; i < MAX_WAITERS; i++) {
368                 cue = psd->pids[i];
369                 if (cue)
370                         return syscall(SYS_tkill, cue, SIGIO);
371         }
372
373         /* no waiter found */
374         return 0;
375 }
376
377 static void *pthread_malloc(uint64_t size)
378 {
379         return malloc((size_t)size);
380 }
381
382 static void *pthread_realloc(void *mem, uint64_t size)
383 {
384         return realloc(mem, (size_t)size);
385 }
386
387 static void pthread_mfree(void *mem)
388 {
389         free(mem);
390 }
391
392 static struct xseg_type xseg_pthread = {
393         /* xseg_operations */
394         {
395                 .mfree          = pthread_mfree,
396                 .allocate       = pthread_allocate,
397                 .deallocate     = pthread_deallocate,
398                 .map            = pthread_map,
399                 .unmap          = pthread_unmap,
400         },
401         /* name */
402         "pthread"
403 };
404
405 int pthread_init_signal_desc(struct xseg *xseg, void *sd)
406 {
407         int i;
408         struct pthread_signal_desc *psd = (struct pthread_signal_desc *)sd;
409         for (i = 0; i < MAX_WAITERS; i++) {
410                 psd->pids[i]=0;
411         }
412         return 0;
413 }
414
415 void pthread_quit_signal_desc(struct xseg *xseg, void *sd)
416 {
417         int i;
418         struct pthread_signal_desc *psd = (struct pthread_signal_desc *)sd;
419         for (i = 0; i < MAX_WAITERS; i++) {
420                 psd->pids[i]=0;
421         }
422         return;
423 }
424
425 void * pthread_alloc_data(struct xseg *xseg)
426 {
427         struct xobject_h *sd_h = xseg_get_objh(xseg, MAGIC_PTHREAD_SD,
428                                 sizeof(struct pthread_signal_desc));
429         return sd_h;
430 }
431
432 void pthread_free_data(struct xseg *xseg, void *data)
433 {
434         if (data)
435                 xseg_put_objh(xseg, (struct xobject_h *)data);
436 }
437
438 void *pthread_alloc_signal_desc(struct xseg *xseg, void *data)
439 {
440         struct xobject_h *sd_h = (struct xobject_h *) data;
441         if (!sd_h)
442                 return NULL;
443         struct pthread_signal_desc *psd = xobj_get_obj(sd_h, X_ALLOC);
444         if (!psd)
445                 return NULL;
446         return psd;
447
448 }
449
450 void pthread_free_signal_desc(struct xseg *xseg, void *data, void *sd)
451 {
452         struct xobject_h *sd_h = (struct xobject_h *) data;
453         if (!sd_h)
454                 return;
455         if (sd)
456                 xobj_put_obj(sd_h, sd);
457         return;
458 }
459
460
461 static struct xseg_peer xseg_peer_pthread = {
462         /* xseg_peer_operations */
463         {
464                 .init_signal_desc   = pthread_init_signal_desc,
465                 .quit_signal_desc   = pthread_quit_signal_desc,
466                 .alloc_data         = pthread_alloc_data,
467                 .free_data          = pthread_free_data,
468                 .alloc_signal_desc  = pthread_alloc_signal_desc,
469                 .free_signal_desc   = pthread_free_signal_desc,
470                 .local_signal_init  = pthread_local_signal_init,
471                 .local_signal_quit  = pthread_local_signal_quit,
472                 .remote_signal_init = pthread_remote_signal_init,
473                 .remote_signal_quit = pthread_remote_signal_quit,
474                 .prepare_wait   = pthread_prepare_wait,
475                 .cancel_wait    = pthread_cancel_wait,
476                 .wait_signal    = pthread_wait_signal,
477                 .signal         = pthread_signal,
478                 .malloc         = pthread_malloc,
479                 .realloc        = pthread_realloc,
480                 .mfree          = pthread_mfree,
481         },
482         /* name */
483         "pthread"
484 };
485
486 void xseg_pthread_init(void)
487 {
488         xseg_register_type(&xseg_pthread);
489         xseg_register_peer(&xseg_peer_pthread);
490 }
491