URI:
       tclient.c - plan9port - [fork] Plan 9 from user space
  HTML git clone git://src.adamsgaard.dk/plan9port
   DIR Log
   DIR Files
   DIR Refs
   DIR README
   DIR LICENSE
       ---
       tclient.c (9434B)
       ---
            1 /*
            2  * Sun RPC client.
            3  */
            4 #include <u.h>
            5 #include <libc.h>
            6 #include <thread.h>
            7 #include <sunrpc.h>
            8 
            9 typedef struct Out Out;
           10 struct Out
           11 {
           12         char err[ERRMAX];        /* error string */
           13         Channel *creply;        /* send to finish rpc */
           14         uchar *p;                        /* pending request packet */
           15         int n;                                /* size of request */
           16         ulong tag;                        /* flush tag of pending request */
           17         ulong xid;                        /* xid of pending request */
           18         ulong st;                        /* first send time */
           19         ulong t;                        /* resend time */
           20         int nresend;                /* number of resends */
           21         SunRpc rpc;                /* response rpc */
           22 };
           23 
           24 static void
           25 udpThread(void *v)
           26 {
           27         uchar *p, *buf;
           28         Ioproc *io;
           29         int n;
           30         SunClient *cli;
           31         enum { BufSize = 65536 };
           32 
           33         cli = v;
           34         buf = emalloc(BufSize);
           35         io = ioproc();
           36         p = nil;
           37         for(;;){
           38                 n = ioread(io, cli->fd, buf, BufSize);
           39                 if(n <= 0)
           40                         break;
           41                 p = emalloc(4+n);
           42                 memmove(p+4, buf, n);
           43                 p[0] = n>>24;
           44                 p[1] = n>>16;
           45                 p[2] = n>>8;
           46                 p[3] = n;
           47                 if(sendp(cli->readchan, p) == 0)
           48                         break;
           49                 p = nil;
           50         }
           51         free(p);
           52         closeioproc(io);
           53         while(send(cli->dying, nil) == -1)
           54                 ;
           55 }
           56 
           57 static void
           58 netThread(void *v)
           59 {
           60         uchar *p, buf[4];
           61         Ioproc *io;
           62         uint n, tot;
           63         int done;
           64         SunClient *cli;
           65 
           66         cli = v;
           67         io = ioproc();
           68         tot = 0;
           69         p = nil;
           70         for(;;){
           71                 n = ioreadn(io, cli->fd, buf, 4);
           72                 if(n != 4)
           73                         break;
           74                 n = (buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|buf[3];
           75                 if(cli->chatty)
           76                         fprint(2, "%.8ux...", n);
           77                 done = n&0x80000000;
           78                 n &= ~0x80000000;
           79                 if(tot == 0){
           80                         p = emalloc(4+n);
           81                         tot = 4;
           82                 }else
           83                         p = erealloc(p, tot+n);
           84                 if(ioreadn(io, cli->fd, p+tot, n) != n)
           85                         break;
           86                 tot += n;
           87                 if(done){
           88                         p[0] = tot>>24;
           89                         p[1] = tot>>16;
           90                         p[2] = tot>>8;
           91                         p[3] = tot;
           92                         if(sendp(cli->readchan, p) == 0)
           93                                 break;
           94                         p = nil;
           95                         tot = 0;
           96                 }
           97         }
           98         free(p);
           99         closeioproc(io);
          100         while(send(cli->dying, 0) == -1)
          101                 ;
          102 }
          103 
          104 static void
          105 timerThread(void *v)
          106 {
          107         Ioproc *io;
          108         SunClient *cli;
          109 
          110         cli = v;
          111         io = ioproc();
          112         for(;;){
          113                 if(iosleep(io, 200) < 0)
          114                         break;
          115                 if(sendul(cli->timerchan, 0) == 0)
          116                         break;
          117         }
          118         closeioproc(io);
          119         while(send(cli->dying, 0) == -1)
          120                 ;
          121 }
          122 
          123 static ulong
          124 msec(void)
          125 {
          126         return nsec()/1000000;
          127 }
          128 
          129 static ulong
          130 twait(ulong rtt, int nresend)
          131 {
          132         ulong t;
          133 
          134         t = rtt;
          135         if(nresend <= 1)
          136                 {}
          137         else if(nresend <= 3)
          138                 t *= 2;
          139         else if(nresend <= 18)
          140                 t <<= nresend-2;
          141         else
          142                 t = 60*1000;
          143         if(t > 60*1000)
          144                 t = 60*1000;
          145 
          146         return t;
          147 }
          148 
          149 static void
          150 rpcMuxThread(void *v)
          151 {
          152         uchar *buf, *p, *ep;
          153         int i, n, nout, mout;
          154         ulong t, xidgen, tag;
          155         Alt a[5];
          156         Out *o, **out;
          157         SunRpc rpc;
          158         SunClient *cli;
          159 
          160         cli = v;
          161         mout = 16;
          162         nout = 0;
          163         out = emalloc(mout*sizeof(out[0]));
          164         xidgen = truerand();
          165 
          166         a[0].op = CHANRCV;
          167         a[0].c = cli->rpcchan;
          168         a[0].v = &o;
          169         a[1].op = CHANNOP;
          170         a[1].c = cli->timerchan;
          171         a[1].v = nil;
          172         a[2].op = CHANRCV;
          173         a[2].c = cli->flushchan;
          174         a[2].v = &tag;
          175         a[3].op = CHANRCV;
          176         a[3].c = cli->readchan;
          177         a[3].v = &buf;
          178         a[4].op = CHANEND;
          179 
          180         for(;;){
          181                 switch(alt(a)){
          182                 case 0:        /* o = <-rpcchan */
          183                         if(o == nil)
          184                                 goto Done;
          185                         cli->nsend++;
          186                         /* set xid */
          187                         o->xid = ++xidgen;
          188                         if(cli->needcount)
          189                                 p = o->p+4;
          190                         else
          191                                 p = o->p;
          192                         p[0] = xidgen>>24;
          193                         p[1] = xidgen>>16;
          194                         p[2] = xidgen>>8;
          195                         p[3] = xidgen;
          196                         if(write(cli->fd, o->p, o->n) != o->n){
          197                                 free(o->p);
          198                                 o->p = nil;
          199                                 snprint(o->err, sizeof o->err, "write: %r");
          200                                 sendp(o->creply, 0);
          201                                 break;
          202                         }
          203                         if(nout >= mout){
          204                                 mout *= 2;
          205                                 out = erealloc(out, mout*sizeof(out[0]));
          206                         }
          207                         o->st = msec();
          208                         o->nresend = 0;
          209                         o->t = o->st + twait(cli->rtt.avg, 0);
          210 if(cli->chatty) fprint(2, "send %lux %lud %lud\n", o->xid, o->st, o->t);
          211                         out[nout++] = o;
          212                         a[1].op = CHANRCV;
          213                         break;
          214 
          215                 case 1:        /* <-timerchan */
          216                         t = msec();
          217                         for(i=0; i<nout; i++){
          218                                 o = out[i];
          219                                 if((int)(t - o->t) > 0){
          220 if(cli->chatty) fprint(2, "resend %lux %lud %lud\n", o->xid, t, o->t);
          221                                         if(cli->maxwait && t - o->st >= cli->maxwait){
          222                                                 free(o->p);
          223                                                 o->p = nil;
          224                                                 strcpy(o->err, "timeout");
          225                                                 sendp(o->creply, 0);
          226                                                 out[i--] = out[--nout];
          227                                                 continue;
          228                                         }
          229                                         cli->nresend++;
          230                                         o->nresend++;
          231                                         o->t = t + twait(cli->rtt.avg, o->nresend);
          232                                         if(write(cli->fd, o->p, o->n) != o->n){
          233                                                 free(o->p);
          234                                                 o->p = nil;
          235                                                 snprint(o->err, sizeof o->err, "rewrite: %r");
          236                                                 sendp(o->creply, 0);
          237                                                 out[i--] = out[--nout];
          238                                                 continue;
          239                                         }
          240                                 }
          241                         }
          242                         /* stop ticking if no work; rpcchan will turn it back on */
          243                         if(nout == 0)
          244                                 a[1].op = CHANNOP;
          245                         break;
          246 
          247                 case 2:        /* tag = <-flushchan */
          248                         for(i=0; i<nout; i++){
          249                                 o = out[i];
          250                                 if(o->tag == tag){
          251                                         out[i--] = out[--nout];
          252                                         strcpy(o->err, "flushed");
          253                                         free(o->p);
          254                                         o->p = nil;
          255                                         sendp(o->creply, 0);
          256                                 }
          257                         }
          258                         break;
          259 
          260                 case 3:        /* buf = <-readchan */
          261                         p = buf;
          262                         n = (p[0]<<24)|(p[1]<<16)|(p[2]<<8)|p[3];
          263                         p += 4;
          264                         ep = p+n;
          265                         if(sunrpcunpack(p, ep, &p, &rpc) != SunSuccess){
          266                                 fprint(2, "%s: in: %.*H unpack failed\n", argv0, n, buf+4);
          267                                 free(buf);
          268                                 break;
          269                         }
          270                         if(cli->chatty)
          271                                 fprint(2, "in: %B\n", &rpc);
          272                         if(rpc.iscall){
          273                                 fprint(2, "did not get reply\n");
          274                                 free(buf);
          275                                 break;
          276                         }
          277                         o = nil;
          278                         for(i=0; i<nout; i++){
          279                                 o = out[i];
          280                                 if(o->xid == rpc.xid)
          281                                         break;
          282                         }
          283                         if(i==nout){
          284                                 if(cli->chatty) fprint(2, "did not find waiting request\n");
          285                                 free(buf);
          286                                 break;
          287                         }
          288                         out[i] = out[--nout];
          289                         free(o->p);
          290                         o->p = nil;
          291                         o->rpc = rpc;
          292                         if(rpc.status == SunSuccess){
          293                                 o->p = buf;
          294                         }else{
          295                                 o->p = nil;
          296                                 free(buf);
          297                                 sunerrstr(rpc.status);
          298                                 rerrstr(o->err, sizeof o->err);
          299                         }
          300                         sendp(o->creply, 0);
          301                         break;
          302                 }
          303         }
          304 Done:
          305         free(out);
          306         sendp(cli->dying, 0);
          307 }
          308 
          309 SunClient*
          310 sundial(char *address)
          311 {
          312         int fd;
          313         SunClient *cli;
          314 
          315         if((fd = dial(address, 0, 0, 0)) < 0)
          316                 return nil;
          317 
          318         cli = emalloc(sizeof(SunClient));
          319         cli->fd = fd;
          320         cli->maxwait = 15000;
          321         cli->rtt.avg = 1000;
          322         cli->dying = chancreate(sizeof(void*), 0);
          323         cli->rpcchan = chancreate(sizeof(Out*), 0);
          324         cli->timerchan = chancreate(sizeof(ulong), 0);
          325         cli->flushchan = chancreate(sizeof(ulong), 0);
          326         cli->readchan = chancreate(sizeof(uchar*), 0);
          327         if(strstr(address, "udp!")){
          328                 cli->needcount = 0;
          329                 cli->nettid = threadcreate(udpThread, cli, SunStackSize);
          330                 cli->timertid = threadcreate(timerThread, cli, SunStackSize);
          331         }else{
          332                 cli->needcount = 1;
          333                 cli->nettid = threadcreate(netThread, cli, SunStackSize);
          334                 /* assume reliable: don't need timer */
          335                 /* BUG: netThread should know how to redial */
          336         }
          337         threadcreate(rpcMuxThread, cli, SunStackSize);
          338 
          339         return cli;
          340 }
          341 
          342 void
          343 sunclientclose(SunClient *cli)
          344 {
          345         int n;
          346 
          347         /*
          348          * Threadints get you out of any stuck system calls
          349          * or thread rendezvouses, but do nothing if the thread
          350          * is in the ready state.  Keep interrupting until it takes.
          351          */
          352         n = 0;
          353         if(!cli->timertid)
          354                 n++;
          355         while(n < 2){
          356 /*
          357                 threadint(cli->nettid);
          358                 if(cli->timertid)
          359                         threadint(cli->timertid);
          360 */
          361 
          362                 yield();
          363                 while(nbrecv(cli->dying, nil) == 1)
          364                         n++;
          365         }
          366 
          367         sendp(cli->rpcchan, 0);
          368         recvp(cli->dying);
          369 
          370         /* everyone's gone: clean up */
          371         close(cli->fd);
          372         chanfree(cli->flushchan);
          373         chanfree(cli->readchan);
          374         chanfree(cli->timerchan);
          375         free(cli);
          376 }
          377 
          378 void
          379 sunclientflushrpc(SunClient *cli, ulong tag)
          380 {
          381         sendul(cli->flushchan, tag);
          382 }
          383 
          384 void
          385 sunclientprog(SunClient *cli, SunProg *p)
          386 {
          387         if(cli->nprog%16 == 0)
          388                 cli->prog = erealloc(cli->prog, (cli->nprog+16)*sizeof(cli->prog[0]));
          389         cli->prog[cli->nprog++] = p;
          390 }
          391 
          392 int
          393 sunclientrpc(SunClient *cli, ulong tag, SunCall *tx, SunCall *rx, uchar **tofree)
          394 {
          395         uchar *bp, *p, *ep;
          396         int i, n1, n2, n, nn;
          397         Out o;
          398         SunProg *prog;
          399         SunStatus ok;
          400 
          401         for(i=0; i<cli->nprog; i++)
          402                 if(cli->prog[i]->prog == tx->rpc.prog && cli->prog[i]->vers == tx->rpc.vers)
          403                         break;
          404         if(i==cli->nprog){
          405                 werrstr("unknown sun rpc program %d version %d", tx->rpc.prog, tx->rpc.vers);
          406                 return -1;
          407         }
          408         prog = cli->prog[i];
          409 
          410         if(cli->chatty){
          411                 fprint(2, "out: %B\n", &tx->rpc);
          412                 fprint(2, "\t%C\n", tx);
          413         }
          414 
          415         n1 = sunrpcsize(&tx->rpc);
          416         n2 = suncallsize(prog, tx);
          417 
          418         n = n1+n2;
          419         if(cli->needcount)
          420                 n += 4;
          421 
          422         /*
          423          * The dance with 100 is to leave some padding in case
          424          * suncallsize is slightly underestimating.  If this happens,
          425          * the pack will succeed and then we can give a good size
          426          * mismatch error below.  Otherwise the pack fails with
          427          * garbage args, which is less helpful.
          428          */
          429         bp = emalloc(n+100);
          430         ep = bp+n+100;
          431         p = bp;
          432         if(cli->needcount){
          433                 nn = n-4;
          434                 p[0] = (nn>>24)|0x80;
          435                 p[1] = nn>>16;
          436                 p[2] = nn>>8;
          437                 p[3] = nn;
          438                 p += 4;
          439         }
          440         if((ok = sunrpcpack(p, ep, &p, &tx->rpc)) != SunSuccess
          441         || (ok = suncallpack(prog, p, ep, &p, tx)) != SunSuccess){
          442                 sunerrstr(ok);
          443                 free(bp);
          444                 return -1;
          445         }
          446         ep -= 100;
          447         if(p != ep){
          448                 werrstr("rpc: packet size mismatch %d %ld %ld", n, ep-bp, p-bp);
          449                 free(bp);
          450                 return -1;
          451         }
          452 
          453         memset(&o, 0, sizeof o);
          454         o.creply = chancreate(sizeof(void*), 0);
          455         o.tag = tag;
          456         o.p = bp;
          457         o.n = n;
          458 
          459         sendp(cli->rpcchan, &o);
          460         recvp(o.creply);
          461         chanfree(o.creply);
          462 
          463         if(o.p == nil){
          464                 werrstr("%s", o.err);
          465                 return -1;
          466         }
          467 
          468         p = o.rpc.data;
          469         ep = p+o.rpc.ndata;
          470         rx->rpc = o.rpc;
          471         rx->rpc.proc = tx->rpc.proc;
          472         rx->rpc.prog = tx->rpc.prog;
          473         rx->rpc.vers = tx->rpc.vers;
          474         rx->type = (rx->rpc.proc<<1)|1;
          475         if(rx->rpc.status != SunSuccess){
          476                 sunerrstr(rx->rpc.status);
          477                 werrstr("unpack: %r");
          478                 free(o.p);
          479                 return -1;
          480         }
          481 
          482         if((ok = suncallunpack(prog, p, ep, &p, rx)) != SunSuccess){
          483                 sunerrstr(ok);
          484                 werrstr("unpack: %r");
          485                 free(o.p);
          486                 return -1;
          487         }
          488 
          489         if(cli->chatty){
          490                 fprint(2, "in: %B\n", &rx->rpc);
          491                 fprint(2, "in:\t%C\n", rx);
          492         }
          493 
          494         if(tofree)
          495                 *tofree = o.p;
          496         else
          497                 free(o.p);
          498 
          499         return 0;
          500 }