URI:
       tchannel.c - plan9port - [fork] Plan 9 from user space
  HTML git clone git://src.adamsgaard.dk/plan9port
   DIR Log
   DIR Files
   DIR Refs
   DIR README
   DIR LICENSE
       ---
       tchannel.c (6729B)
       ---
            1 #include "threadimpl.h"
            2 
            3 /*
            4  * One can go through a lot of effort to avoid this global lock.
            5  * You have to put locks in all the channels and all the Alt
            6  * structures.  At the beginning of an alt you have to lock all
            7  * the channels, but then to try to actually exec an op you
            8  * have to lock the other guy's alt structure, so that other
            9  * people aren't trying to use him in some other op at the
           10  * same time.
           11  *
           12  * For Plan 9 apps, it's just not worth the extra effort.
           13  */
           14 static QLock chanlock;
           15 
           16 Channel*
           17 chancreate(int elemsize, int bufsize)
           18 {
           19         Channel *c;
           20 
           21         c = malloc(sizeof *c+bufsize*elemsize);
           22         if(c == nil)
           23                 sysfatal("chancreate malloc: %r");
           24         memset(c, 0, sizeof *c);
           25         c->elemsize = elemsize;
           26         c->bufsize = bufsize;
           27         c->nbuf = 0;
           28         c->buf = (uchar*)(c+1);
           29         return c;
           30 }
           31 
           32 void
           33 chansetname(Channel *c, char *fmt, ...)
           34 {
           35         char *name;
           36         va_list arg;
           37 
           38         va_start(arg, fmt);
           39         name = vsmprint(fmt, arg);
           40         va_end(arg);
           41         free(c->name);
           42         c->name = name;
           43 }
           44 
           45 /* bug - work out races */
           46 void
           47 chanfree(Channel *c)
           48 {
           49         if(c == nil)
           50                 return;
           51         free(c->name);
           52         free(c->arecv.a);
           53         free(c->asend.a);
           54         free(c);
           55 }
           56 
           57 static void
           58 addarray(_Altarray *a, Alt *alt)
           59 {
           60         if(a->n == a->m){
           61                 a->m += 16;
           62                 a->a = realloc(a->a, a->m*sizeof a->a[0]);
           63         }
           64         a->a[a->n++] = alt;
           65 }
           66 
           67 static void
           68 delarray(_Altarray *a, int i)
           69 {
           70         --a->n;
           71         a->a[i] = a->a[a->n];
           72 }
           73 
           74 /*
           75  * doesn't really work for things other than CHANSND and CHANRCV
           76  * but is only used as arg to chanarray, which can handle it
           77  */
           78 #define otherop(op)        (CHANSND+CHANRCV-(op))
           79 
           80 static _Altarray*
           81 chanarray(Channel *c, uint op)
           82 {
           83         switch(op){
           84         default:
           85                 return nil;
           86         case CHANSND:
           87                 return &c->asend;
           88         case CHANRCV:
           89                 return &c->arecv;
           90         }
           91 }
           92 
           93 static int
           94 altcanexec(Alt *a)
           95 {
           96         _Altarray *ar;
           97         Channel *c;
           98 
           99         if(a->op == CHANNOP || (c=a->c) == nil)
          100                 return 0;
          101         if(c->bufsize == 0){
          102                 ar = chanarray(c, otherop(a->op));
          103                 return ar && ar->n;
          104         }else{
          105                 switch(a->op){
          106                 default:
          107                         return 0;
          108                 case CHANSND:
          109                         return c->nbuf < c->bufsize;
          110                 case CHANRCV:
          111                         return c->nbuf > 0;
          112                 }
          113         }
          114 }
          115 
          116 static void
          117 altqueue(Alt *a)
          118 {
          119         _Altarray *ar;
          120 
          121         if(a->c == nil)
          122                 return;
          123         ar = chanarray(a->c, a->op);
          124         addarray(ar, a);
          125 }
          126 
          127 static void
          128 altdequeue(Alt *a)
          129 {
          130         int i;
          131         _Altarray *ar;
          132 
          133         ar = chanarray(a->c, a->op);
          134         if(ar == nil){
          135                 fprint(2, "bad use of altdequeue op=%d\n", a->op);
          136                 abort();
          137         }
          138 
          139         for(i=0; i<ar->n; i++)
          140                 if(ar->a[i] == a){
          141                         delarray(ar, i);
          142                         return;
          143                 }
          144         fprint(2, "cannot find self in altdequeue\n");
          145         abort();
          146 }
          147 
          148 static void
          149 altalldequeue(Alt *a)
          150 {
          151         int i;
          152 
          153         for(i=0; a[i].op!=CHANEND && a[i].op!=CHANNOBLK; i++)
          154                 if(a[i].op != CHANNOP)
          155                         altdequeue(&a[i]);
          156 }
          157 
          158 static void
          159 amove(void *dst, void *src, uint n)
          160 {
          161         if(dst){
          162                 if(src == nil)
          163                         memset(dst, 0, n);
          164                 else
          165                         memmove(dst, src, n);
          166         }
          167 }
          168 
          169 /*
          170  * Actually move the data around.  There are up to three
          171  * players: the sender, the receiver, and the channel itself.
          172  * If the channel is unbuffered or the buffer is empty,
          173  * data goes from sender to receiver.  If the channel is full,
          174  * the receiver removes some from the channel and the sender
          175  * gets to put some in.
          176  */
          177 static void
          178 altcopy(Alt *s, Alt *r)
          179 {
          180         Alt *t;
          181         Channel *c;
          182         uchar *cp;
          183 
          184         /*
          185          * Work out who is sender and who is receiver
          186          */
          187         if(s == nil && r == nil)
          188                 return;
          189         assert(s != nil);
          190         c = s->c;
          191         if(s->op == CHANRCV){
          192                 t = s;
          193                 s = r;
          194                 r = t;
          195         }
          196         assert(s==nil || s->op == CHANSND);
          197         assert(r==nil || r->op == CHANRCV);
          198 
          199         /*
          200          * Channel is empty (or unbuffered) - copy directly.
          201          */
          202         if(s && r && c->nbuf == 0){
          203                 amove(r->v, s->v, c->elemsize);
          204                 return;
          205         }
          206 
          207         /*
          208          * Otherwise it's always okay to receive and then send.
          209          */
          210         if(r){
          211                 cp = c->buf + c->off*c->elemsize;
          212                 amove(r->v, cp, c->elemsize);
          213                 --c->nbuf;
          214                 if(++c->off == c->bufsize)
          215                         c->off = 0;
          216         }
          217         if(s){
          218                 cp = c->buf + (c->off+c->nbuf)%c->bufsize*c->elemsize;
          219                 amove(cp, s->v, c->elemsize);
          220                 ++c->nbuf;
          221         }
          222 }
          223 
          224 static void
          225 altexec(Alt *a)
          226 {
          227         int i;
          228         _Altarray *ar;
          229         Alt *other;
          230         Channel *c;
          231 
          232         c = a->c;
          233         ar = chanarray(c, otherop(a->op));
          234         if(ar && ar->n){
          235                 i = rand()%ar->n;
          236                 other = ar->a[i];
          237                 altcopy(a, other);
          238                 altalldequeue(other->thread->alt);
          239                 other->thread->alt = other;
          240                 _threadready(other->thread);
          241         }else
          242                 altcopy(a, nil);
          243 }
          244 
          245 #define dbgalt 0
          246 int
          247 chanalt(Alt *a)
          248 {
          249         int i, j, ncan, n, canblock;
          250         Channel *c;
          251         _Thread *t;
          252 
          253         needstack(512);
          254         for(i=0; a[i].op != CHANEND && a[i].op != CHANNOBLK; i++)
          255                 ;
          256         n = i;
          257         canblock = a[i].op == CHANEND;
          258 
          259         t = proc()->thread;
          260         for(i=0; i<n; i++)
          261                 a[i].thread = t;
          262         t->alt = a;
          263         qlock(&chanlock);
          264 if(dbgalt) print("alt ");
          265         ncan = 0;
          266         for(i=0; i<n; i++){
          267                 c = a[i].c;
          268 if(dbgalt) print(" %c:", "esrnb"[a[i].op]);
          269 if(dbgalt) if(c->name) print("%s", c->name); else print("%p", c);
          270                 if(altcanexec(&a[i])){
          271 if(dbgalt) print("*");
          272                         ncan++;
          273                 }
          274         }
          275         if(ncan){
          276                 j = rand()%ncan;
          277                 for(i=0; i<n; i++){
          278                         if(altcanexec(&a[i])){
          279                                 if(j-- == 0){
          280 if(dbgalt){
          281 c = a[i].c;
          282 print(" => %c:", "esrnb"[a[i].op]);
          283 if(c->name) print("%s", c->name); else print("%p", c);
          284 print("\n");
          285 }
          286                                         altexec(&a[i]);
          287                                         qunlock(&chanlock);
          288                                         return i;
          289                                 }
          290                         }
          291                 }
          292         }
          293 if(dbgalt)print("\n");
          294 
          295         if(!canblock){
          296                 qunlock(&chanlock);
          297                 return -1;
          298         }
          299 
          300         for(i=0; i<n; i++){
          301                 if(a[i].op != CHANNOP)
          302                         altqueue(&a[i]);
          303         }
          304         qunlock(&chanlock);
          305 
          306         _threadswitch();
          307 
          308         /*
          309          * the guy who ran the op took care of dequeueing us
          310          * and then set t->alt to the one that was executed.
          311          */
          312         if(t->alt < a || t->alt >= a+n)
          313                 sysfatal("channel bad alt");
          314         return t->alt - a;
          315 }
          316 
          317 static int
          318 _chanop(Channel *c, int op, void *p, int canblock)
          319 {
          320         Alt a[2];
          321 
          322         a[0].c = c;
          323         a[0].op = op;
          324         a[0].v = p;
          325         a[1].op = canblock ? CHANEND : CHANNOBLK;
          326         if(chanalt(a) < 0)
          327                 return -1;
          328         return 1;
          329 }
          330 
          331 int
          332 chansend(Channel *c, void *v)
          333 {
          334         return _chanop(c, CHANSND, v, 1);
          335 }
          336 
          337 int
          338 channbsend(Channel *c, void *v)
          339 {
          340         return _chanop(c, CHANSND, v, 0);
          341 }
          342 
          343 int
          344 chanrecv(Channel *c, void *v)
          345 {
          346         return _chanop(c, CHANRCV, v, 1);
          347 }
          348 
          349 int
          350 channbrecv(Channel *c, void *v)
          351 {
          352         return _chanop(c, CHANRCV, v, 0);
          353 }
          354 
          355 int
          356 chansendp(Channel *c, void *v)
          357 {
          358         return _chanop(c, CHANSND, (void*)&v, 1);
          359 }
          360 
          361 void*
          362 chanrecvp(Channel *c)
          363 {
          364         void *v;
          365 
          366         if(_chanop(c, CHANRCV, (void*)&v, 1) > 0)
          367                 return v;
          368         return nil;
          369 }
          370 
          371 int
          372 channbsendp(Channel *c, void *v)
          373 {
          374         return _chanop(c, CHANSND, (void*)&v, 0);
          375 }
          376 
          377 void*
          378 channbrecvp(Channel *c)
          379 {
          380         void *v;
          381 
          382         if(_chanop(c, CHANRCV, (void*)&v, 0) > 0)
          383                 return v;
          384         return nil;
          385 }
          386 
          387 int
          388 chansendul(Channel *c, ulong val)
          389 {
          390         return _chanop(c, CHANSND, &val, 1);
          391 }
          392 
          393 ulong
          394 chanrecvul(Channel *c)
          395 {
          396         ulong val;
          397 
          398         if(_chanop(c, CHANRCV, &val, 1) > 0)
          399                 return val;
          400         return 0;
          401 }
          402 
          403 int
          404 channbsendul(Channel *c, ulong val)
          405 {
          406         return _chanop(c, CHANSND, &val, 0);
          407 }
          408 
          409 ulong
          410 channbrecvul(Channel *c)
          411 {
          412         ulong val;
          413 
          414         if(_chanop(c, CHANRCV, &val, 0) > 0)
          415                 return val;
          416         return 0;
          417 }