tmux.c - plan9port - [fork] Plan 9 from user space
HTML git clone git://src.adamsgaard.dk/plan9port
DIR Log
DIR Files
DIR Refs
DIR README
DIR LICENSE
---
tmux.c (5119B)
---
1 /* Copyright (C) 2003-2006 Russ Cox, Massachusetts Institute of Technology */
2 /* See COPYRIGHT */
3
4 /*
5 * Generic RPC packet multiplexor. Inspired by but not derived from
6 * Plan 9 kernel. Originally developed as part of Tra, later used in
7 * libnventi, and then finally split out into a generic library.
8 */
9
10 #include <u.h>
11 #include <libc.h>
12 #include <mux.h>
13
14 static int gettag(Mux*, Muxrpc*);
15 static void puttag(Mux*, Muxrpc*);
16 static void enqueue(Mux*, Muxrpc*);
17 static void dequeue(Mux*, Muxrpc*);
18
19 void
20 muxinit(Mux *mux)
21 {
22 memset(&mux->lk, 0, sizeof(Mux)-offsetof(Mux, lk));
23 mux->tagrend.l = &mux->lk;
24 mux->rpcfork.l = &mux->lk;
25 mux->sleep.next = &mux->sleep;
26 mux->sleep.prev = &mux->sleep;
27 }
28
29 static Muxrpc*
30 allocmuxrpc(Mux *mux)
31 {
32 Muxrpc *r;
33
34 /* must malloc because stack could be private */
35 r = mallocz(sizeof(Muxrpc), 1);
36 if(r == nil){
37 werrstr("mallocz: %r");
38 return nil;
39 }
40 r->mux = mux;
41 r->r.l = &mux->lk;
42 r->waiting = 1;
43
44 return r;
45 }
46
47 static int
48 tagmuxrpc(Muxrpc *r, void *tx)
49 {
50 int tag;
51 Mux *mux;
52
53 mux = r->mux;
54 /* assign the tag, add selves to response queue */
55 qlock(&mux->lk);
56 tag = gettag(mux, r);
57 /*print("gettag %p %d\n", r, tag); */
58 enqueue(mux, r);
59 qunlock(&mux->lk);
60
61 /* actually send the packet */
62 if(tag < 0 || mux->settag(mux, tx, tag) < 0 || _muxsend(mux, tx) < 0){
63 werrstr("settag/send tag %d: %r", tag);
64 fprint(2, "%r\n");
65 qlock(&mux->lk);
66 dequeue(mux, r);
67 puttag(mux, r);
68 qunlock(&mux->lk);
69 return -1;
70 }
71 return 0;
72 }
73
74 void
75 muxmsgandqlock(Mux *mux, void *p)
76 {
77 int tag;
78 Muxrpc *r2;
79
80 tag = mux->gettag(mux, p) - mux->mintag;
81 /*print("mux tag %d\n", tag); */
82 qlock(&mux->lk);
83 /* hand packet to correct sleeper */
84 if(tag < 0 || tag >= mux->mwait){
85 fprint(2, "%s: bad rpc tag %ux\n", argv0, tag);
86 /* must leak packet! don't know how to free it! */
87 return;
88 }
89 r2 = mux->wait[tag];
90 if(r2 == nil || r2->prev == nil){
91 fprint(2, "%s: bad rpc tag %ux (no one waiting on that tag)\n", argv0, tag);
92 /* must leak packet! don't know how to free it! */
93 return;
94 }
95 r2->p = p;
96 dequeue(mux, r2);
97 rwakeup(&r2->r);
98 }
99
100 void
101 electmuxer(Mux *mux)
102 {
103 Muxrpc *rpc;
104
105 /* if there is anyone else sleeping, wake them to mux */
106 for(rpc=mux->sleep.next; rpc != &mux->sleep; rpc = rpc->next){
107 if(!rpc->async){
108 mux->muxer = rpc;
109 rwakeup(&rpc->r);
110 return;
111 }
112 }
113 mux->muxer = nil;
114 }
115
116 void*
117 muxrpc(Mux *mux, void *tx)
118 {
119 int tag;
120 Muxrpc *r;
121 void *p;
122
123 if((r = allocmuxrpc(mux)) == nil)
124 return nil;
125
126 if((tag = tagmuxrpc(r, tx)) < 0)
127 return nil;
128
129 qlock(&mux->lk);
130 /* wait for our packet */
131 while(mux->muxer && mux->muxer != r && !r->p)
132 rsleep(&r->r);
133
134 /* if not done, there's no muxer: start muxing */
135 if(!r->p){
136 if(mux->muxer != nil && mux->muxer != r)
137 abort();
138 mux->muxer = r;
139 while(!r->p){
140 qunlock(&mux->lk);
141 _muxrecv(mux, 1, &p);
142 if(p == nil){
143 /* eof -- just give up and pass the buck */
144 qlock(&mux->lk);
145 dequeue(mux, r);
146 break;
147 }
148 muxmsgandqlock(mux, p);
149 }
150 electmuxer(mux);
151 }
152 p = r->p;
153 puttag(mux, r);
154 qunlock(&mux->lk);
155 if(p == nil)
156 werrstr("unexpected eof");
157 return p;
158 }
159
160 Muxrpc*
161 muxrpcstart(Mux *mux, void *tx)
162 {
163 int tag;
164 Muxrpc *r;
165
166 if((r = allocmuxrpc(mux)) == nil)
167 return nil;
168 r->async = 1;
169 if((tag = tagmuxrpc(r, tx)) < 0)
170 return nil;
171 return r;
172 }
173
174 int
175 muxrpccanfinish(Muxrpc *r, void **vp)
176 {
177 void *p;
178 Mux *mux;
179 int ret;
180
181 mux = r->mux;
182 qlock(&mux->lk);
183 ret = 1;
184 if(!r->p && !mux->muxer){
185 mux->muxer = r;
186 while(!r->p){
187 qunlock(&mux->lk);
188 p = nil;
189 if(!_muxrecv(mux, 0, &p))
190 ret = 0;
191 if(p == nil){
192 qlock(&mux->lk);
193 break;
194 }
195 muxmsgandqlock(mux, p);
196 }
197 electmuxer(mux);
198 }
199 p = r->p;
200 if(p)
201 puttag(mux, r);
202 qunlock(&mux->lk);
203 *vp = p;
204 return ret;
205 }
206
207 static void
208 enqueue(Mux *mux, Muxrpc *r)
209 {
210 r->next = mux->sleep.next;
211 r->prev = &mux->sleep;
212 r->next->prev = r;
213 r->prev->next = r;
214 }
215
216 static void
217 dequeue(Mux *mux, Muxrpc *r)
218 {
219 r->next->prev = r->prev;
220 r->prev->next = r->next;
221 r->prev = nil;
222 r->next = nil;
223 }
224
225 static int
226 gettag(Mux *mux, Muxrpc *r)
227 {
228 int i, mw;
229 Muxrpc **w;
230
231 for(;;){
232 /* wait for a free tag */
233 while(mux->nwait == mux->mwait){
234 if(mux->mwait < mux->maxtag-mux->mintag){
235 mw = mux->mwait;
236 if(mw == 0)
237 mw = 1;
238 else
239 mw <<= 1;
240 w = realloc(mux->wait, mw*sizeof(w[0]));
241 if(w == nil)
242 return -1;
243 memset(w+mux->mwait, 0, (mw-mux->mwait)*sizeof(w[0]));
244 mux->wait = w;
245 mux->freetag = mux->mwait;
246 mux->mwait = mw;
247 break;
248 }
249 rsleep(&mux->tagrend);
250 }
251
252 i=mux->freetag;
253 if(mux->wait[i] == 0)
254 goto Found;
255 for(; i<mux->mwait; i++)
256 if(mux->wait[i] == 0)
257 goto Found;
258 for(i=0; i<mux->freetag; i++)
259 if(mux->wait[i] == 0)
260 goto Found;
261 /* should not fall out of while without free tag */
262 fprint(2, "libfs: nwait botch\n");
263 abort();
264 }
265
266 Found:
267 mux->nwait++;
268 mux->wait[i] = r;
269 r->tag = i+mux->mintag;
270 return r->tag;
271 }
272
273 static void
274 puttag(Mux *mux, Muxrpc *r)
275 {
276 int i;
277
278 i = r->tag - mux->mintag;
279 assert(mux->wait[i] == r);
280 mux->wait[i] = nil;
281 mux->nwait--;
282 mux->freetag = i;
283 rwakeup(&mux->tagrend);
284 free(r);
285 }