URI:
       ticachewrite.c - plan9port - [fork] Plan 9 from user space
  HTML git clone git://src.adamsgaard.dk/plan9port
   DIR Log
   DIR Files
   DIR Refs
   DIR README
   DIR LICENSE
       ---
       ticachewrite.c (7471B)
       ---
            1 /*
            2  * Write the dirty icache entries to disk.  Random seeks are
            3  * so expensive that it makes sense to wait until we have
            4  * a lot and then just make a sequential pass over the disk.
            5  */
            6 #include "stdinc.h"
            7 #include "dat.h"
            8 #include "fns.h"
            9 
           10 static void icachewriteproc(void*);
           11 static void icachewritecoord(void*);
           12 static IEntry *iesort(IEntry*);
           13 
           14 int icachesleeptime = 1000;        /* milliseconds */
           15 int minicachesleeptime = 0;
           16 
           17 enum
           18 {
           19         Bufsize = 8*1024*1024
           20 };
           21 
           22 typedef struct IWrite IWrite;
           23 struct IWrite
           24 {
           25         Round round;
           26         AState as;
           27 };
           28 
           29 static IWrite iwrite;
           30 
           31 void
           32 initicachewrite(void)
           33 {
           34         int i;
           35         Index *ix;
           36 
           37         initround(&iwrite.round, "icache", 120*60*1000);
           38         ix = mainindex;
           39         for(i=0; i<ix->nsects; i++){
           40                 ix->sects[i]->writechan = chancreate(sizeof(ulong), 1);
           41                 ix->sects[i]->writedonechan = chancreate(sizeof(ulong), 1);
           42                 vtproc(icachewriteproc, ix->sects[i]);
           43         }
           44         vtproc(icachewritecoord, nil);
           45         vtproc(delaykickroundproc, &iwrite.round);
           46 }
           47 
           48 static u64int
           49 ie2diskaddr(Index *ix, ISect *is, IEntry *ie)
           50 {
           51         u64int bucket, addr;
           52 
           53         bucket = hashbits(ie->score, 32)/ix->div;
           54         addr = is->blockbase + ((bucket - is->start) << is->blocklog);
           55         return addr;
           56 }
           57 
           58 static IEntry*
           59 nextchunk(Index *ix, ISect *is, IEntry **pie, u64int *paddr, uint *pnbuf)
           60 {
           61         u64int addr, naddr;
           62         uint nbuf;
           63         int bsize;
           64         IEntry *iefirst, *ie, **l;
           65 
           66         bsize = 1<<is->blocklog;
           67         iefirst = *pie;
           68         addr = ie2diskaddr(ix, is, iefirst);
           69         nbuf = 0;
           70         for(l = &iefirst->nextdirty; (ie = *l) != nil; l = &(*l)->nextdirty){
           71                 naddr = ie2diskaddr(ix, is, ie);
           72                 if(naddr - addr >= Bufsize)
           73                         break;
           74                 nbuf = naddr - addr;
           75         }
           76         nbuf += bsize;
           77 
           78         *l = nil;
           79         *pie = ie;
           80         *paddr = addr;
           81         *pnbuf = nbuf;
           82         return iefirst;
           83 }
           84 
           85 static int
           86 icachewritesect(Index *ix, ISect *is, u8int *buf)
           87 {
           88         int err, i, werr, h, bsize, t;
           89         u32int lo, hi;
           90         u64int addr, naddr;
           91         uint nbuf, off;
           92         DBlock *b;
           93         IBucket ib;
           94         IEntry *ie, *iedirty, **l, *chunk;
           95 
           96         lo = is->start * ix->div;
           97         if(TWID32/ix->div < is->stop)
           98                 hi = TWID32;
           99         else
          100                 hi = is->stop * ix->div - 1;
          101 
          102         trace(TraceProc, "icachewritesect enter %ud %ud %llud",
          103                 lo, hi, iwrite.as.aa);
          104 
          105         iedirty = icachedirty(lo, hi, iwrite.as.aa);
          106         iedirty = iesort(iedirty);
          107         bsize = 1 << is->blocklog;
          108         err = 0;
          109 
          110         while(iedirty){
          111                 disksched();
          112                 while((t = icachesleeptime) == SleepForever){
          113                         sleep(1000);
          114                         disksched();
          115                 }
          116                 if(t < minicachesleeptime)
          117                         t = minicachesleeptime;
          118                 if(t > 0)
          119                         sleep(t);
          120                 trace(TraceProc, "icachewritesect nextchunk");
          121                 chunk = nextchunk(ix, is, &iedirty, &addr, &nbuf);
          122 
          123                 trace(TraceProc, "icachewritesect readpart 0x%llux+0x%ux",
          124                         addr, nbuf);
          125                 if(readpart(is->part, addr, buf, nbuf) < 0){
          126                         fprint(2, "%s: part %s addr 0x%llux: icachewritesect "
          127                                 "readpart: %r\n", argv0, is->part->name, addr);
          128                         err  = -1;
          129                         continue;
          130                 }
          131                 trace(TraceProc, "icachewritesect updatebuf");
          132                 addstat(StatIsectReadBytes, nbuf);
          133                 addstat(StatIsectRead, 1);
          134 
          135                 for(l=&chunk; (ie=*l)!=nil; l=&ie->nextdirty){
          136 again:
          137                         naddr = ie2diskaddr(ix, is, ie);
          138                         off = naddr - addr;
          139                         if(off+bsize > nbuf){
          140                                 fprint(2, "%s: whoops! addr=0x%llux nbuf=%ud "
          141                                         "addr+nbuf=0x%llux naddr=0x%llux\n",
          142                                         argv0, addr, nbuf, addr+nbuf, naddr);
          143                                 assert(off+bsize <= nbuf);
          144                         }
          145                         unpackibucket(&ib, buf+off, is->bucketmagic);
          146                         if(okibucket(&ib, is) < 0){
          147                                 fprint(2, "%s: bad bucket XXX\n", argv0);
          148                                 goto skipit;
          149                         }
          150                         trace(TraceProc, "icachewritesect add %V at 0x%llux",
          151                                 ie->score, naddr);
          152                         h = bucklook(ie->score, ie->ia.type, ib.data, ib.n);
          153                         if(h & 1){
          154                                 h ^= 1;
          155                                 packientry(ie, &ib.data[h]);
          156                         }else if(ib.n < is->buckmax){
          157                                 memmove(&ib.data[h + IEntrySize], &ib.data[h],
          158                                         ib.n*IEntrySize - h);
          159                                 ib.n++;
          160                                 packientry(ie, &ib.data[h]);
          161                         }else{
          162                                 fprint(2, "%s: bucket overflow XXX\n", argv0);
          163 skipit:
          164                                 err = -1;
          165                                 *l = ie->nextdirty;
          166                                 ie = *l;
          167                                 if(ie)
          168                                         goto again;
          169                                 else
          170                                         break;
          171                         }
          172                         packibucket(&ib, buf+off, is->bucketmagic);
          173                 }
          174 
          175                 diskaccess(1);
          176 
          177                 trace(TraceProc, "icachewritesect writepart", addr, nbuf);
          178                 werr = 0;
          179                 if(writepart(is->part, addr, buf, nbuf) < 0 || flushpart(is->part) < 0)
          180                         werr = -1;
          181 
          182                 for(i=0; i<nbuf; i+=bsize){
          183                         if((b = _getdblock(is->part, addr+i, ORDWR, 0)) != nil){
          184                                 memmove(b->data, buf+i, bsize);
          185                                 putdblock(b);
          186                         }
          187                 }
          188 
          189                 if(werr < 0){
          190                         fprint(2, "%s: part %s addr 0x%llux: icachewritesect "
          191                                 "writepart: %r\n", argv0, is->part->name, addr);
          192                         err = -1;
          193                         continue;
          194                 }
          195 
          196                 addstat(StatIsectWriteBytes, nbuf);
          197                 addstat(StatIsectWrite, 1);
          198                 icacheclean(chunk);
          199         }
          200 
          201         trace(TraceProc, "icachewritesect done");
          202         return err;
          203 }
          204 
          205 static void
          206 icachewriteproc(void *v)
          207 {
          208         int ret;
          209         uint bsize;
          210         ISect *is;
          211         Index *ix;
          212         u8int *buf;
          213 
          214         ix = mainindex;
          215         is = v;
          216         threadsetname("icachewriteproc:%s", is->part->name);
          217 
          218         bsize = 1<<is->blocklog;
          219         buf = emalloc(Bufsize+bsize);
          220         buf = (u8int*)(((uintptr)buf+bsize-1)&~(uintptr)(bsize-1));
          221 
          222         for(;;){
          223                 trace(TraceProc, "icachewriteproc recv");
          224                 recv(is->writechan, 0);
          225                 trace(TraceWork, "start");
          226                 ret = icachewritesect(ix, is, buf);
          227                 trace(TraceProc, "icachewriteproc send");
          228                 trace(TraceWork, "finish");
          229                 sendul(is->writedonechan, ret);
          230         }
          231 }
          232 
          233 static void
          234 icachewritecoord(void *v)
          235 {
          236         int i, err;
          237         Index *ix;
          238         AState as;
          239 
          240         USED(v);
          241 
          242         threadsetname("icachewritecoord");
          243 
          244         ix = mainindex;
          245         iwrite.as = icachestate();
          246 
          247         for(;;){
          248                 trace(TraceProc, "icachewritecoord sleep");
          249                 waitforkick(&iwrite.round);
          250                 trace(TraceWork, "start");
          251                 as = icachestate();
          252                 if(as.arena==iwrite.as.arena && as.aa==iwrite.as.aa){
          253                         /* will not be able to do anything more than last flush - kick disk */
          254                         trace(TraceProc, "icachewritecoord kick dcache");
          255                         kickdcache();
          256                         trace(TraceProc, "icachewritecoord kicked dcache");
          257                         goto SkipWork;        /* won't do anything; don't bother rewriting bloom filter */
          258                 }
          259                 iwrite.as = as;
          260 
          261                 trace(TraceProc, "icachewritecoord start flush");
          262                 if(iwrite.as.arena){
          263                         for(i=0; i<ix->nsects; i++)
          264                                 send(ix->sects[i]->writechan, 0);
          265                         if(ix->bloom)
          266                                 send(ix->bloom->writechan, 0);
          267 
          268                         err = 0;
          269                         for(i=0; i<ix->nsects; i++)
          270                                 err |= recvul(ix->sects[i]->writedonechan);
          271                         if(ix->bloom)
          272                                 err |= recvul(ix->bloom->writedonechan);
          273 
          274                         trace(TraceProc, "icachewritecoord donewrite err=%d", err);
          275                         if(err == 0){
          276                                 setatailstate(&iwrite.as);
          277                         }
          278                 }
          279         SkipWork:
          280                 icacheclean(nil);        /* wake up anyone waiting */
          281                 trace(TraceWork, "finish");
          282                 addstat(StatIcacheFlush, 1);
          283         }
          284 }
          285 
          286 void
          287 flushicache(void)
          288 {
          289         trace(TraceProc, "flushicache enter");
          290         kickround(&iwrite.round, 1);
          291         trace(TraceProc, "flushicache exit");
          292 }
          293 
          294 void
          295 kickicache(void)
          296 {
          297         kickround(&iwrite.round, 0);
          298 }
          299 
          300 void
          301 delaykickicache(void)
          302 {
          303         delaykickround(&iwrite.round);
          304 }
          305 
          306 static IEntry*
          307 iesort(IEntry *ie)
          308 {
          309         int cmp;
          310         IEntry **l;
          311         IEntry *ie1, *ie2, *sorted;
          312 
          313         if(ie == nil || ie->nextdirty == nil)
          314                 return ie;
          315 
          316         /* split the lists */
          317         ie1 = ie;
          318         ie2 = ie;
          319         if(ie2)
          320                 ie2 = ie2->nextdirty;
          321         if(ie2)
          322                 ie2 = ie2->nextdirty;
          323         while(ie1 && ie2){
          324                 ie1 = ie1->nextdirty;
          325                 ie2 = ie2->nextdirty;
          326                 if(ie2)
          327                         ie2 = ie2->nextdirty;
          328         }
          329         if(ie1){
          330                 ie2 = ie1->nextdirty;
          331                 ie1->nextdirty = nil;
          332         }
          333 
          334         /* sort the lists */
          335         ie1 = iesort(ie);
          336         ie2 = iesort(ie2);
          337 
          338         /* merge the lists */
          339         sorted = nil;
          340         l = &sorted;
          341         cmp = 0;
          342         while(ie1 || ie2){
          343                 if(ie1 && ie2)
          344                         cmp = scorecmp(ie1->score, ie2->score);
          345                 if(ie1==nil || (ie2 && cmp > 0)){
          346                         *l = ie2;
          347                         l = &ie2->nextdirty;
          348                         ie2 = ie2->nextdirty;
          349                 }else{
          350                         *l = ie1;
          351                         l = &ie1->nextdirty;
          352                         ie1 = ie1->nextdirty;
          353                 }
          354         }
          355         *l = nil;
          356         return sorted;
          357 }