URI:
       trpc.c - plan9port - [fork] Plan 9 from user space
  HTML git clone git://src.adamsgaard.dk/plan9port
   DIR Log
   DIR Files
   DIR Refs
   DIR README
   DIR LICENSE
       ---
       trpc.c (3248B)
       ---
            1 /*
            2  * Multiplexed Venti client.  It would be nice if we
            3  * could turn this into a generic library routine rather
            4  * than keep it Venti specific.  A user-level 9P client
            5  * could use something like this too.
            6  *
            7  * (Actually it does - this should be replaced with libmux,
            8  * which should be renamed librpcmux.)
            9  *
           10  * This is a little more complicated than it might be
           11  * because we want it to work well within and without libthread.
           12  *
           13  * The mux code is inspired by tra's, which is inspired by the Plan 9 kernel.
           14  */
           15 
           16 #include <u.h>
           17 #include <libc.h>
           18 #include <venti.h>
           19 
           20 typedef struct Rwait Rwait;
           21 struct Rwait
           22 {
           23         Rendez r;
           24         Packet *p;
           25         int done;
           26         int sleeping;
           27 };
           28 
           29 static int gettag(VtConn*, Rwait*);
           30 static void puttag(VtConn*, Rwait*, int);
           31 static void muxrpc(VtConn*, Packet*);
           32 
           33 Packet*
           34 _vtrpc(VtConn *z, Packet *p, VtFcall *tx)
           35 {
           36         int i;
           37         uchar tag, buf[2], *top;
           38         Rwait *r, *rr;
           39 
           40         if(z == nil){
           41                 werrstr("not connected");
           42                 packetfree(p);
           43                 return nil;
           44         }
           45 
           46         /* must malloc because stack could be private */
           47         r = vtmallocz(sizeof(Rwait));
           48 
           49         qlock(&z->lk);
           50         r->r.l = &z->lk;
           51         tag = gettag(z, r);
           52         if(tx){
           53                 /* vtfcallrpc can't print packet because it doesn't have tag */
           54                 tx->tag = tag;
           55                 if(chattyventi)
           56                         fprint(2, "%s -> %F\n", argv0, tx);
           57         }
           58 
           59         /* slam tag into packet */
           60         top = packetpeek(p, buf, 0, 2);
           61         if(top == nil){
           62                 packetfree(p);
           63                 return nil;
           64         }
           65         if(top == buf){
           66                 werrstr("first two bytes must be in same packet fragment");
           67                 packetfree(p);
           68                 vtfree(r);
           69                 return nil;
           70         }
           71         top[1] = tag;
           72         qunlock(&z->lk);
           73         if(vtsend(z, p) < 0){
           74                 vtfree(r);
           75                 return nil;
           76         }
           77 
           78         qlock(&z->lk);
           79         /* wait for the muxer to give us our packet */
           80         r->sleeping = 1;
           81         z->nsleep++;
           82         while(z->muxer && !r->done)
           83                 rsleep(&r->r);
           84         z->nsleep--;
           85         r->sleeping = 0;
           86 
           87         /* if not done, there's no muxer: start muxing */
           88         if(!r->done){
           89                 if(z->muxer)
           90                         abort();
           91                 z->muxer = 1;
           92                 while(!r->done){
           93                         qunlock(&z->lk);
           94                         if((p = vtrecv(z)) == nil){
           95                                 werrstr("unexpected eof on venti connection");
           96                                 z->muxer = 0;
           97                                 vtfree(r);
           98                                 return nil;
           99                         }
          100                         qlock(&z->lk);
          101                         muxrpc(z, p);
          102                 }
          103                 z->muxer = 0;
          104                 /* if there is anyone else sleeping, wake first unfinished to mux */
          105                 if(z->nsleep)
          106                 for(i=0; i<256; i++){
          107                         rr = z->wait[i];
          108                         if(rr && rr->sleeping && !rr->done){
          109                                 rwakeup(&rr->r);
          110                                 break;
          111                         }
          112                 }
          113         }
          114 
          115         p = r->p;
          116         puttag(z, r, tag);
          117         vtfree(r);
          118         qunlock(&z->lk);
          119         return p;
          120 }
          121 
          122 Packet*
          123 vtrpc(VtConn *z, Packet *p)
          124 {
          125         return _vtrpc(z, p, nil);
          126 }
          127 
          128 static int
          129 gettag(VtConn *z, Rwait *r)
          130 {
          131         int i;
          132 
          133 Again:
          134         while(z->ntag == 256)
          135                 rsleep(&z->tagrend);
          136         for(i=0; i<256; i++)
          137                 if(z->wait[i] == 0){
          138                         z->ntag++;
          139                         z->wait[i] = r;
          140                         return i;
          141                 }
          142         fprint(2, "libventi: ntag botch\n");
          143         goto Again;
          144 }
          145 
          146 static void
          147 puttag(VtConn *z, Rwait *r, int tag)
          148 {
          149         assert(z->wait[tag] == r);
          150         z->wait[tag] = nil;
          151         z->ntag--;
          152         rwakeup(&z->tagrend);
          153 }
          154 
          155 static void
          156 muxrpc(VtConn *z, Packet *p)
          157 {
          158         uchar tag, buf[2], *top;
          159         Rwait *r;
          160 
          161         if((top = packetpeek(p, buf, 0, 2)) == nil){
          162                 fprint(2, "libventi: short packet in vtrpc\n");
          163                 packetfree(p);
          164                 return;
          165         }
          166 
          167         tag = top[1];
          168         if((r = z->wait[tag]) == nil){
          169                 fprint(2, "libventi: unexpected packet tag %d in vtrpc\n", tag);
          170 abort();
          171                 packetfree(p);
          172                 return;
          173         }
          174 
          175         r->p = p;
          176         r->done = 1;
          177         rwakeup(&r->r);
          178 }