URI:
       tsend.c - plan9port - [fork] Plan 9 from user space
  HTML git clone git://src.adamsgaard.dk/plan9port
   DIR Log
   DIR Files
   DIR Refs
   DIR README
   DIR LICENSE
       ---
       tsend.c (4733B)
       ---
            1 #include <u.h>
            2 #include <libc.h>
            3 #include <venti.h>
            4 #include "queue.h"
            5 
            6 long ventisendbytes, ventisendpackets;
            7 long ventirecvbytes, ventirecvpackets;
            8 
            9 static int
           10 _vtsend(VtConn *z, Packet *p)
           11 {
           12         IOchunk ioc;
           13         int n, tot;
           14         uchar buf[4];
           15 
           16         if(z->state != VtStateConnected) {
           17                 werrstr("session not connected");
           18                 return -1;
           19         }
           20 
           21         /* add framing */
           22         n = packetsize(p);
           23         if(z->version[1] == '2') {
           24                 if(n >= (1<<16)) {
           25                         werrstr("packet too large");
           26                         packetfree(p);
           27                         return -1;
           28                 }
           29                 buf[0] = n>>8;
           30                 buf[1] = n;
           31                 packetprefix(p, buf, 2);
           32                 ventisendbytes += n+2;
           33         } else {
           34                 buf[0] = n>>24;
           35                 buf[1] = n>>16;
           36                 buf[2] = n>>8;
           37                 buf[3] = n;
           38                 packetprefix(p, buf, 4);
           39                 ventisendbytes += n+4;
           40         }
           41         ventisendpackets++;
           42 
           43         tot = 0;
           44         for(;;){
           45                 n = packetfragments(p, &ioc, 1, 0);
           46                 if(n == 0)
           47                         break;
           48                 if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){
           49                         vtlog(VtServerLog, "<font size=-1>%T %s:</font> sending packet %p: %r<br>\n", z->addr, p);
           50                         packetfree(p);
           51                         return -1;
           52                 }
           53                 packetconsume(p, nil, ioc.len);
           54                 tot += ioc.len;
           55         }
           56         vtlog(VtServerLog, "<font size=-1>%T %s:</font> sent packet %p (%d bytes)<br>\n", z->addr, p, tot);
           57         packetfree(p);
           58         return 1;
           59 }
           60 
           61 static int
           62 interrupted(void)
           63 {
           64         char e[ERRMAX];
           65 
           66         rerrstr(e, sizeof e);
           67         return strstr(e, "interrupted") != nil;
           68 }
           69 
           70 
           71 static Packet*
           72 _vtrecv(VtConn *z)
           73 {
           74         uchar buf[10], *b;
           75         int n, need;
           76         Packet *p;
           77         int size, len;
           78 
           79         if(z->state != VtStateConnected) {
           80                 werrstr("session not connected");
           81                 return nil;
           82         }
           83 
           84         p = z->part;
           85         /* get enough for head size */
           86         size = packetsize(p);
           87         need = z->version[1] - '0';        // 2 or 4
           88         while(size < need) {
           89                 b = packettrailer(p, need);
           90                 assert(b != nil);
           91                 if(0) fprint(2, "%d read hdr\n", getpid());
           92                 n = read(z->infd, b, need);
           93                 if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
           94                 if(n==0 || (n<0 && !interrupted()))
           95                         goto Err;
           96                 size += n;
           97                 packettrim(p, 0, size);
           98         }
           99 
          100         if(packetconsume(p, buf, need) < 0)
          101                 goto Err;
          102         if(z->version[1] == '2') {
          103                 len = (buf[0] << 8) | buf[1];
          104                 size -= 2;
          105         } else {
          106                 len = (buf[0]<<24) | (buf[1]<<16) | (buf[2]<<8) | buf[3];
          107                 size -= 4;
          108         }
          109 
          110         while(size < len) {
          111                 n = len - size;
          112                 if(n > MaxFragSize)
          113                         n = MaxFragSize;
          114                 b = packettrailer(p, n);
          115                 if(0) fprint(2, "%d read body %d\n", getpid(), n);
          116                 n = read(z->infd, b, n);
          117                 if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
          118                 if(n > 0)
          119                         size += n;
          120                 packettrim(p, 0, size);
          121                 if(n==0 || (n<0 && !interrupted()))
          122                         goto Err;
          123         }
          124         ventirecvbytes += len;
          125         ventirecvpackets++;
          126         p = packetsplit(p, len);
          127         vtlog(VtServerLog, "<font size=-1>%T %s:</font> read packet %p len %d<br>\n", z->addr, p, len);
          128         return p;
          129 Err:
          130         vtlog(VtServerLog, "<font size=-1>%T %s:</font> error reading packet: %r<br>\n", z->addr);
          131         return nil;
          132 }
          133 
          134 /*
          135  * If you fork off two procs running vtrecvproc and vtsendproc,
          136  * then vtrecv/vtsend (and thus vtrpc) will never block except on
          137  * rendevouses, which is nice when it's running in one thread of many.
          138  */
          139 void
          140 vtrecvproc(void *v)
          141 {
          142         Packet *p;
          143         VtConn *z;
          144         Queue *q;
          145 
          146         z = v;
          147         q = _vtqalloc();
          148 
          149         qlock(&z->lk);
          150         z->readq = q;
          151         qlock(&z->inlk);
          152         rwakeup(&z->rpcfork);
          153         qunlock(&z->lk);
          154 
          155         while((p = _vtrecv(z)) != nil)
          156                 if(_vtqsend(q, p) < 0){
          157                         packetfree(p);
          158                         break;
          159                 }
          160         qunlock(&z->inlk);
          161         qlock(&z->lk);
          162         _vtqhangup(q);
          163         while((p = _vtnbqrecv(q)) != nil)
          164                 packetfree(p);
          165         _vtqdecref(q);
          166         z->readq = nil;
          167         rwakeup(&z->rpcfork);
          168         qunlock(&z->lk);
          169         vthangup(z);
          170 }
          171 
          172 void
          173 vtsendproc(void *v)
          174 {
          175         Queue *q;
          176         Packet *p;
          177         VtConn *z;
          178 
          179         z = v;
          180         q = _vtqalloc();
          181 
          182         qlock(&z->lk);
          183         z->writeq = q;
          184         qlock(&z->outlk);
          185         rwakeup(&z->rpcfork);
          186         qunlock(&z->lk);
          187 
          188         while((p = _vtqrecv(q)) != nil)
          189                 if(_vtsend(z, p) < 0)
          190                         break;
          191         qunlock(&z->outlk);
          192         qlock(&z->lk);
          193         _vtqhangup(q);
          194         while((p = _vtnbqrecv(q)) != nil)
          195                 packetfree(p);
          196         _vtqdecref(q);
          197         z->writeq = nil;
          198         rwakeup(&z->rpcfork);
          199         qunlock(&z->lk);
          200         return;
          201 }
          202 
          203 Packet*
          204 vtrecv(VtConn *z)
          205 {
          206         Packet *p;
          207         Queue *q;
          208 
          209         qlock(&z->lk);
          210         if(z->state != VtStateConnected){
          211                 werrstr("not connected");
          212                 qunlock(&z->lk);
          213                 return nil;
          214         }
          215         if(z->readq){
          216                 q = _vtqincref(z->readq);
          217                 qunlock(&z->lk);
          218                 p = _vtqrecv(q);
          219                 _vtqdecref(q);
          220                 return p;
          221         }
          222 
          223         qlock(&z->inlk);
          224         qunlock(&z->lk);
          225         p = _vtrecv(z);
          226         qunlock(&z->inlk);
          227         if(!p)
          228                 vthangup(z);
          229         return p;
          230 }
          231 
          232 int
          233 vtsend(VtConn *z, Packet *p)
          234 {
          235         Queue *q;
          236 
          237         qlock(&z->lk);
          238         if(z->state != VtStateConnected){
          239                 packetfree(p);
          240                 werrstr("not connected");
          241                 qunlock(&z->lk);
          242                 return -1;
          243         }
          244         if(z->writeq){
          245                 q = _vtqincref(z->writeq);
          246                 qunlock(&z->lk);
          247                 if(_vtqsend(q, p) < 0){
          248                         _vtqdecref(q);
          249                         packetfree(p);
          250                         return -1;
          251                 }
          252                 _vtqdecref(q);
          253                 return 0;
          254         }
          255 
          256         qlock(&z->outlk);
          257         qunlock(&z->lk);
          258         if(_vtsend(z, p) < 0){
          259                 qunlock(&z->outlk);
          260                 vthangup(z);
          261                 return -1;
          262         }
          263         qunlock(&z->outlk);
          264         return 0;
          265 }