URI:
       tio.c - plan9port - [fork] Plan 9 from user space
  HTML git clone git://src.adamsgaard.dk/plan9port
   DIR Log
   DIR Files
   DIR Refs
   DIR README
   DIR LICENSE
       ---
       tio.c (2279B)
       ---
            1 /* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */
            2 /* See COPYRIGHT */
            3 
            4 #include <u.h>
            5 #include <libc.h>
            6 #include <mux.h>
            7 
            8 /*
            9  * If you fork off two procs running muxrecvproc and muxsendproc,
           10  * then muxrecv/muxsend (and thus muxrpc) will never block except on
           11  * rendevouses, which is nice when it's running in one thread of many.
           12  */
           13 void
           14 _muxrecvproc(void *v)
           15 {
           16         void *p;
           17         Mux *mux;
           18         Muxqueue *q;
           19 
           20         mux = v;
           21         q = _muxqalloc();
           22 
           23         qlock(&mux->lk);
           24         mux->readq = q;
           25         qlock(&mux->inlk);
           26         rwakeup(&mux->rpcfork);
           27         qunlock(&mux->lk);
           28 
           29         while((p = mux->recv(mux)) != nil)
           30                 if(_muxqsend(q, p) < 0){
           31                         free(p);
           32                         break;
           33                 }
           34         qunlock(&mux->inlk);
           35         qlock(&mux->lk);
           36         _muxqhangup(q);
           37         p = nil;
           38         while(_muxnbqrecv(q, &p) && p != nil){
           39                 free(p);
           40                 p = nil;
           41         }
           42         free(q);
           43         mux->readq = nil;
           44         rwakeup(&mux->rpcfork);
           45         qunlock(&mux->lk);
           46 }
           47 
           48 void
           49 _muxsendproc(void *v)
           50 {
           51         Muxqueue *q;
           52         void *p;
           53         Mux *mux;
           54 
           55         mux = v;
           56         q = _muxqalloc();
           57 
           58         qlock(&mux->lk);
           59         mux->writeq = q;
           60         qlock(&mux->outlk);
           61         rwakeup(&mux->rpcfork);
           62         qunlock(&mux->lk);
           63 
           64         while((p = _muxqrecv(q)) != nil)
           65                 if(mux->send(mux, p) < 0)
           66                         break;
           67         qunlock(&mux->outlk);
           68         qlock(&mux->lk);
           69         _muxqhangup(q);
           70         while(_muxnbqrecv(q, &p))
           71                 free(p);
           72         free(q);
           73         mux->writeq = nil;
           74         rwakeup(&mux->rpcfork);
           75         qunlock(&mux->lk);
           76         return;
           77 }
           78 
           79 int
           80 _muxrecv(Mux *mux, int canblock, void **vp)
           81 {
           82         void *p;
           83         int ret;
           84 
           85         qlock(&mux->lk);
           86         if(mux->readq){
           87                 qunlock(&mux->lk);
           88                 if(canblock){
           89                         *vp = _muxqrecv(mux->readq);
           90                         return 1;
           91                 }
           92                 return _muxnbqrecv(mux->readq, vp);
           93         }
           94 
           95         qlock(&mux->inlk);
           96         qunlock(&mux->lk);
           97         if(canblock){
           98                 p = mux->recv(mux);
           99                 ret = 1;
          100         }else{
          101                 if(mux->nbrecv)
          102                         ret = mux->nbrecv(mux, &p);
          103                 else{
          104                         /* send eof, not "no packet ready" */
          105                         p = nil;
          106                         ret = 1;
          107                 }
          108         }
          109         qunlock(&mux->inlk);
          110         *vp = p;
          111         return ret;
          112 }
          113 
          114 int
          115 _muxsend(Mux *mux, void *p)
          116 {
          117         qlock(&mux->lk);
          118 /*
          119         if(mux->state != VtStateConnected){
          120                 packetfree(p);
          121                 werrstr("not connected");
          122                 qunlock(&mux->lk);
          123                 return -1;
          124         }
          125 */
          126         if(mux->writeq){
          127                 qunlock(&mux->lk);
          128                 if(_muxqsend(mux->writeq, p) < 0){
          129                         free(p);
          130                         return -1;
          131                 }
          132                 return 0;
          133         }
          134 
          135         qlock(&mux->outlk);
          136         qunlock(&mux->lk);
          137         if(mux->send(mux, p) < 0){
          138                 qunlock(&mux->outlk);
          139                 /* vthangup(mux); */
          140                 return -1;
          141         }
          142         qunlock(&mux->outlk);
          143         return 0;
          144 }