URI:
       tlumpqueue.c - plan9port - [fork] Plan 9 from user space
  HTML git clone git://src.adamsgaard.dk/plan9port
   DIR Log
   DIR Files
   DIR Refs
   DIR README
   DIR LICENSE
       ---
       tlumpqueue.c (2721B)
       ---
            1 #include "stdinc.h"
            2 #include "dat.h"
            3 #include "fns.h"
            4 
            5 typedef struct LumpQueue        LumpQueue;
            6 typedef struct WLump                WLump;
            7 
            8 enum
            9 {
           10         MaxLumpQ        = 1 << 3        /* max. lumps on a single write queue, must be pow 2 */
           11 };
           12 
           13 struct WLump
           14 {
           15         Lump        *u;
           16         Packet        *p;
           17         int        creator;
           18         int        gen;
           19         uint        ms;
           20 };
           21 
           22 struct LumpQueue
           23 {
           24         QLock        lock;
           25         Rendez         flush;
           26         Rendez        full;
           27         Rendez        empty;
           28         WLump        q[MaxLumpQ];
           29         int        w;
           30         int        r;
           31 };
           32 
           33 static LumpQueue        *lumpqs;
           34 static int                nqs;
           35 
           36 static QLock                glk;
           37 static int                gen;
           38 
           39 static void        queueproc(void *vq);
           40 
           41 int
           42 initlumpqueues(int nq)
           43 {
           44         LumpQueue *q;
           45 
           46         int i;
           47         nqs = nq;
           48 
           49         lumpqs = MKNZ(LumpQueue, nq);
           50 
           51         for(i = 0; i < nq; i++){
           52                 q = &lumpqs[i];
           53                 q->full.l = &q->lock;
           54                 q->empty.l = &q->lock;
           55                 q->flush.l = &q->lock;
           56 
           57                 if(vtproc(queueproc, q) < 0){
           58                         seterr(EOk, "can't start write queue slave: %r");
           59                         return -1;
           60                 }
           61         }
           62 
           63         return 0;
           64 }
           65 
           66 /*
           67  * queue a lump & it's packet data for writing
           68  */
           69 int
           70 queuewrite(Lump *u, Packet *p, int creator, uint ms)
           71 {
           72         LumpQueue *q;
           73         int i;
           74 
           75         trace(TraceProc, "queuewrite");
           76         i = indexsect(mainindex, u->score);
           77         if(i < 0 || i >= nqs){
           78                 seterr(EBug, "internal error: illegal index section in queuewrite");
           79                 return -1;
           80         }
           81 
           82         q = &lumpqs[i];
           83 
           84         qlock(&q->lock);
           85         while(q->r == ((q->w + 1) & (MaxLumpQ - 1))){
           86                 trace(TraceProc, "queuewrite sleep");
           87                 rsleep(&q->full);
           88         }
           89 
           90         q->q[q->w].u = u;
           91         q->q[q->w].p = p;
           92         q->q[q->w].creator = creator;
           93         q->q[q->w].ms = ms;
           94         q->q[q->w].gen = gen;
           95         q->w = (q->w + 1) & (MaxLumpQ - 1);
           96 
           97         trace(TraceProc, "queuewrite wakeup");
           98         rwakeup(&q->empty);
           99 
          100         qunlock(&q->lock);
          101 
          102         return 0;
          103 }
          104 
          105 void
          106 flushqueue(void)
          107 {
          108         int i;
          109         LumpQueue *q;
          110 
          111         if(!lumpqs)
          112                 return;
          113 
          114         trace(TraceProc, "flushqueue");
          115 
          116         qlock(&glk);
          117         gen++;
          118         qunlock(&glk);
          119 
          120         for(i=0; i<mainindex->nsects; i++){
          121                 q = &lumpqs[i];
          122                 qlock(&q->lock);
          123                 while(q->w != q->r && gen - q->q[q->r].gen > 0){
          124                         trace(TraceProc, "flushqueue sleep q%d", i);
          125                         rsleep(&q->flush);
          126                 }
          127                 qunlock(&q->lock);
          128         }
          129 }
          130 
          131 static void
          132 queueproc(void *vq)
          133 {
          134         LumpQueue *q;
          135         Lump *u;
          136         Packet *p;
          137         int creator;
          138         uint ms;
          139 
          140         threadsetname("queueproc");
          141 
          142         q = vq;
          143         for(;;){
          144                 qlock(&q->lock);
          145                 while(q->w == q->r){
          146                         trace(TraceProc, "queueproc sleep empty");
          147                         rsleep(&q->empty);
          148                 }
          149 
          150                 u = q->q[q->r].u;
          151                 p = q->q[q->r].p;
          152                 creator = q->q[q->r].creator;
          153                 ms = q->q[q->r].ms;
          154 
          155                 q->r = (q->r + 1) & (MaxLumpQ - 1);
          156                 trace(TraceProc, "queueproc wakeup flush");
          157                 rwakeupall(&q->flush);
          158 
          159                 trace(TraceProc, "queueproc wakeup full");
          160                 rwakeup(&q->full);
          161 
          162                 qunlock(&q->lock);
          163 
          164                 trace(TraceProc, "queueproc writelump %V", u->score);
          165                 if(writeqlump(u, p, creator, ms) < 0)
          166                         fprint(2, "failed to write lump for %V: %r", u->score);
          167                 trace(TraceProc, "queueproc wrotelump %V", u->score);
          168 
          169                 putlump(u);
          170         }
          171 }