URI:
       tmux.c - plan9port - [fork] Plan 9 from user space
  HTML git clone git://src.adamsgaard.dk/plan9port
   DIR Log
   DIR Files
   DIR Refs
   DIR README
   DIR LICENSE
       ---
       tmux.c (5119B)
       ---
            1 /* Copyright (C) 2003-2006 Russ Cox, Massachusetts Institute of Technology */
            2 /* See COPYRIGHT */
            3 
            4 /*
            5  * Generic RPC packet multiplexor.  Inspired by but not derived from
            6  * Plan 9 kernel.  Originally developed as part of Tra, later used in
            7  * libnventi, and then finally split out into a generic library.
            8  */
            9 
           10 #include <u.h>
           11 #include <libc.h>
           12 #include <mux.h>
           13 
           14 static int gettag(Mux*, Muxrpc*);
           15 static void puttag(Mux*, Muxrpc*);
           16 static void enqueue(Mux*, Muxrpc*);
           17 static void dequeue(Mux*, Muxrpc*);
           18 
           19 void
           20 muxinit(Mux *mux)
           21 {
           22         memset(&mux->lk, 0, sizeof(Mux)-offsetof(Mux, lk));
           23         mux->tagrend.l = &mux->lk;
           24         mux->rpcfork.l = &mux->lk;
           25         mux->sleep.next = &mux->sleep;
           26         mux->sleep.prev = &mux->sleep;
           27 }
           28 
           29 static Muxrpc*
           30 allocmuxrpc(Mux *mux)
           31 {
           32         Muxrpc *r;
           33 
           34         /* must malloc because stack could be private */
           35         r = mallocz(sizeof(Muxrpc), 1);
           36         if(r == nil){
           37                 werrstr("mallocz: %r");
           38                 return nil;
           39         }
           40         r->mux = mux;
           41         r->r.l = &mux->lk;
           42         r->waiting = 1;
           43 
           44         return r;
           45 }
           46 
           47 static int
           48 tagmuxrpc(Muxrpc *r, void *tx)
           49 {
           50         int tag;
           51         Mux *mux;
           52 
           53         mux = r->mux;
           54         /* assign the tag, add selves to response queue */
           55         qlock(&mux->lk);
           56         tag = gettag(mux, r);
           57 /*print("gettag %p %d\n", r, tag); */
           58         enqueue(mux, r);
           59         qunlock(&mux->lk);
           60 
           61         /* actually send the packet */
           62         if(tag < 0 || mux->settag(mux, tx, tag) < 0 || _muxsend(mux, tx) < 0){
           63                 werrstr("settag/send tag %d: %r", tag);
           64                 fprint(2, "%r\n");
           65                 qlock(&mux->lk);
           66                 dequeue(mux, r);
           67                 puttag(mux, r);
           68                 qunlock(&mux->lk);
           69                 return -1;
           70         }
           71         return 0;
           72 }
           73 
           74 void
           75 muxmsgandqlock(Mux *mux, void *p)
           76 {
           77         int tag;
           78         Muxrpc *r2;
           79 
           80         tag = mux->gettag(mux, p) - mux->mintag;
           81 /*print("mux tag %d\n", tag); */
           82         qlock(&mux->lk);
           83         /* hand packet to correct sleeper */
           84         if(tag < 0 || tag >= mux->mwait){
           85                 fprint(2, "%s: bad rpc tag %ux\n", argv0, tag);
           86                 /* must leak packet! don't know how to free it! */
           87                 return;
           88         }
           89         r2 = mux->wait[tag];
           90         if(r2 == nil || r2->prev == nil){
           91                 fprint(2, "%s: bad rpc tag %ux (no one waiting on that tag)\n", argv0, tag);
           92                 /* must leak packet! don't know how to free it! */
           93                 return;
           94         }
           95         r2->p = p;
           96         dequeue(mux, r2);
           97         rwakeup(&r2->r);
           98 }
           99 
          100 void
          101 electmuxer(Mux *mux)
          102 {
          103         Muxrpc *rpc;
          104 
          105         /* if there is anyone else sleeping, wake them to mux */
          106         for(rpc=mux->sleep.next; rpc != &mux->sleep; rpc = rpc->next){
          107                 if(!rpc->async){
          108                         mux->muxer = rpc;
          109                         rwakeup(&rpc->r);
          110                         return;
          111                 }
          112         }
          113         mux->muxer = nil;
          114 }
          115 
          116 void*
          117 muxrpc(Mux *mux, void *tx)
          118 {
          119         int tag;
          120         Muxrpc *r;
          121         void *p;
          122 
          123         if((r = allocmuxrpc(mux)) == nil)
          124                 return nil;
          125 
          126         if((tag = tagmuxrpc(r, tx)) < 0)
          127                 return nil;
          128 
          129         qlock(&mux->lk);
          130         /* wait for our packet */
          131         while(mux->muxer && mux->muxer != r && !r->p)
          132                 rsleep(&r->r);
          133 
          134         /* if not done, there's no muxer: start muxing */
          135         if(!r->p){
          136                 if(mux->muxer != nil && mux->muxer != r)
          137                         abort();
          138                 mux->muxer = r;
          139                 while(!r->p){
          140                         qunlock(&mux->lk);
          141                         _muxrecv(mux, 1, &p);
          142                         if(p == nil){
          143                                 /* eof -- just give up and pass the buck */
          144                                 qlock(&mux->lk);
          145                                 dequeue(mux, r);
          146                                 break;
          147                         }
          148                         muxmsgandqlock(mux, p);
          149                 }
          150                 electmuxer(mux);
          151         }
          152         p = r->p;
          153         puttag(mux, r);
          154         qunlock(&mux->lk);
          155         if(p == nil)
          156                 werrstr("unexpected eof");
          157         return p;
          158 }
          159 
          160 Muxrpc*
          161 muxrpcstart(Mux *mux, void *tx)
          162 {
          163         int tag;
          164         Muxrpc *r;
          165 
          166         if((r = allocmuxrpc(mux)) == nil)
          167                 return nil;
          168         r->async = 1;
          169         if((tag = tagmuxrpc(r, tx)) < 0)
          170                 return nil;
          171         return r;
          172 }
          173 
          174 int
          175 muxrpccanfinish(Muxrpc *r, void **vp)
          176 {
          177         void *p;
          178         Mux *mux;
          179         int ret;
          180 
          181         mux = r->mux;
          182         qlock(&mux->lk);
          183         ret = 1;
          184         if(!r->p && !mux->muxer){
          185                 mux->muxer = r;
          186                 while(!r->p){
          187                         qunlock(&mux->lk);
          188                         p = nil;
          189                         if(!_muxrecv(mux, 0, &p))
          190                                 ret = 0;
          191                         if(p == nil){
          192                                 qlock(&mux->lk);
          193                                 break;
          194                         }
          195                         muxmsgandqlock(mux, p);
          196                 }
          197                 electmuxer(mux);
          198         }
          199         p = r->p;
          200         if(p)
          201                 puttag(mux, r);
          202         qunlock(&mux->lk);
          203         *vp = p;
          204         return ret;
          205 }
          206 
          207 static void
          208 enqueue(Mux *mux, Muxrpc *r)
          209 {
          210         r->next = mux->sleep.next;
          211         r->prev = &mux->sleep;
          212         r->next->prev = r;
          213         r->prev->next = r;
          214 }
          215 
          216 static void
          217 dequeue(Mux *mux, Muxrpc *r)
          218 {
          219         r->next->prev = r->prev;
          220         r->prev->next = r->next;
          221         r->prev = nil;
          222         r->next = nil;
          223 }
          224 
          225 static int
          226 gettag(Mux *mux, Muxrpc *r)
          227 {
          228         int i, mw;
          229         Muxrpc **w;
          230 
          231         for(;;){
          232                 /* wait for a free tag */
          233                 while(mux->nwait == mux->mwait){
          234                         if(mux->mwait < mux->maxtag-mux->mintag){
          235                                 mw = mux->mwait;
          236                                 if(mw == 0)
          237                                         mw = 1;
          238                                 else
          239                                         mw <<= 1;
          240                                 w = realloc(mux->wait, mw*sizeof(w[0]));
          241                                 if(w == nil)
          242                                         return -1;
          243                                 memset(w+mux->mwait, 0, (mw-mux->mwait)*sizeof(w[0]));
          244                                 mux->wait = w;
          245                                 mux->freetag = mux->mwait;
          246                                 mux->mwait = mw;
          247                                 break;
          248                         }
          249                         rsleep(&mux->tagrend);
          250                 }
          251 
          252                 i=mux->freetag;
          253                 if(mux->wait[i] == 0)
          254                         goto Found;
          255                 for(; i<mux->mwait; i++)
          256                         if(mux->wait[i] == 0)
          257                                 goto Found;
          258                 for(i=0; i<mux->freetag; i++)
          259                         if(mux->wait[i] == 0)
          260                                 goto Found;
          261                 /* should not fall out of while without free tag */
          262                 fprint(2, "libfs: nwait botch\n");
          263                 abort();
          264         }
          265 
          266 Found:
          267         mux->nwait++;
          268         mux->wait[i] = r;
          269         r->tag = i+mux->mintag;
          270         return r->tag;
          271 }
          272 
          273 static void
          274 puttag(Mux *mux, Muxrpc *r)
          275 {
          276         int i;
          277 
          278         i = r->tag - mux->mintag;
          279         assert(mux->wait[i] == r);
          280         mux->wait[i] = nil;
          281         mux->nwait--;
          282         mux->freetag = i;
          283         rwakeup(&mux->tagrend);
          284         free(r);
          285 }