add finalize function to peers and make mapper close all maps before quitting
[archipelago] / xseg / peers / user / monitor.c
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <sys/types.h>
4 #include <pthread.h>
5 #include <xseg/xseg.h>
6 #include <peer.h>
7 #include <sys/time.h>
8
9 #define INPUT_BUF_SIZE 256
10 #define MAX_NR_ARGS 100
11
12 struct monitord {
13         uint32_t mon_portno;
14 };
15
16 struct monitor_io {
17         uint32_t src_portno;
18         void *src_priv;
19 };
20
21 void custom_peer_usage()
22 {
23         return;
24 }
25
26 static int forward(struct peerd *peer, struct peer_req *pr)
27 {
28         int r;
29         r = submit_peer_req(peer, pr);
30         if (r < 0) {
31                 printf("couldn't forward request");
32                 return -1;
33         }
34         return 0;
35 }
36
37 static int complete_forwarded(struct peerd *peer, struct peer_req *pr)
38 {
39         struct xseg_request *req = pr->req;
40
41         // assert mio->src_portno != NoPort
42         if (req->state & XS_SERVED)
43                 complete(peer, pr);
44         else if (req->state & XS_FAILED)
45                 fail (peer, pr);
46         else {
47                 printf("invalid state\n");
48                 return -1;
49         }
50         return 0;
51 }
52
53 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *xreq,
54                 enum dispatch_reason reason)
55 {
56         struct xseg_request *req = pr->req;
57         if (req->state & (XS_SERVED | XS_FAILED)){
58                 log_pr("completing", pr);
59                 complete_forwarded(peer, pr);
60         }
61         else {
62                 log_pr("forwarding", pr);
63                 forward(peer,pr);
64         }
65         return 0;
66 }
67
68 int mpause(struct peerd *peer)
69 {
70         struct xseg *xseg = peer->xseg;
71         struct xseg_port *port = xseg_get_port(xseg, peer->portno_start);
72         if (!port)
73                 return -1;
74         
75         xlock_acquire(&port->rq_lock, peer->portno_start);
76         xlock_acquire(&port->pq_lock, peer->portno_start);
77         return 0;
78 }
79
80 int munpause(struct peerd *peer)
81 {
82         struct xseg *xseg = peer->xseg;
83         struct xseg_port *port = xseg_get_port(xseg, peer->portno_start);
84         if (!port)
85                 return -1;
86         
87         xlock_release(&port->rq_lock);
88         xlock_release(&port->pq_lock);
89         return 0;
90 }
91
92 struct peerd *main_peer;
93
94 void main_loop(void)
95 {
96         int ret;
97         struct peerd * peer = main_peer;
98         char buf[INPUT_BUF_SIZE];
99         char *nl;
100
101         unsigned int portno = NoPort, dstgw, srcgw;
102
103         for (;;){
104                 printf("waitin next line\n");
105                 if (fgets(buf, INPUT_BUF_SIZE, stdin)) {
106                         nl = strchr(buf, '\n');
107                         if (nl)
108                                 *nl = 0;
109                         buf[INPUT_BUF_SIZE -1] = 0;
110                         printf("got line input\n");
111                         ret = sscanf(buf, "set srcgw %u %u", &portno, &srcgw);
112                         if (ret == 2){
113                                 printf("found setsrcgw\n");
114                                 xseg_set_srcgw(peer->xseg, (uint32_t) portno, (uint32_t) srcgw);
115                                 continue;
116                         };
117                         ret = sscanf(buf, "set dstgw %u %u", &portno, &dstgw);
118                         if (ret == 2){
119                                 printf("found set dstgw\n");
120                                 xseg_set_dstgw(peer->xseg, (uint32_t) portno, (uint32_t) dstgw);
121                                 continue;
122                         };
123                         ret = sscanf(buf, "getandset srcgw %u %u", &portno, &srcgw);
124                         if (ret == 2){
125                                 printf("found getand set srcgw\n");
126                                 xseg_getandset_srcgw(peer->xseg, (uint32_t) portno, (uint32_t) srcgw);
127                                 continue;
128                         };
129                         ret = sscanf(buf, "getandset dstgw %u %u", &portno, &dstgw);
130                         if (ret == 2){
131                                 printf("found getandset dstgw\n");
132                                 xseg_getandset_dstgw(peer->xseg, (uint32_t) portno, (uint32_t) dstgw);
133                                 continue;
134                         };
135                         ret = sscanf(buf, "pause %u", &portno);
136                         if (ret == 1){
137                                 printf("found pause\n");
138                                 mpause(peer);
139                                 continue;
140                         };
141                         ret = sscanf(buf, "unpause %u", &portno);
142                         if (ret == 1){
143                                 printf("found unpause\n");
144                                 munpause(peer);
145                                 continue;
146                         };
147                 }
148                 else
149                         exit(0);
150         }
151 }
152
153 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
154 {
155         int i;
156         struct monitor_io *mio;
157         struct monitord *monitor;
158
159         monitor = malloc(sizeof(struct monitord));
160         if (!monitor)
161                 return -1;
162         peer->priv = monitor;
163         monitor->mon_portno = NoPort;
164         
165         
166         for (i = 0; i < peer->nr_ops; i++) {
167                 mio = malloc(sizeof(struct monitor_io));
168                 if (!mio)
169                         return -1;
170                 peer->peer_reqs[i].priv = mio;
171                 mio->src_portno = NoPort;
172         }
173         
174         for (i = 1; i < argc; i++) {
175                 if (!strcmp(argv[i], "-mp") && (i + 1 < argc)) {
176                         monitor->mon_portno = atoi(argv[i+1]);
177                         i+=1;
178                         continue;
179                 }
180         }
181         main_peer = peer;
182
183         peer->interactive_func = main_loop;
184
185         return 0;
186 }
187
188 void custom_peer_finalize(struct peerd *peer)
189 {
190         return;
191 }