URI:
       tsortientry.c - plan9port - [fork] Plan 9 from user space
  HTML git clone git://src.adamsgaard.dk/plan9port
   DIR Log
   DIR Files
   DIR Refs
   DIR README
   DIR LICENSE
       ---
       tsortientry.c (8324B)
       ---
            1 #include "stdinc.h"
            2 #include "dat.h"
            3 #include "fns.h"
            4 #include <bio.h>
            5 
            6 typedef struct IEBuck        IEBuck;
            7 typedef struct IEBucks        IEBucks;
            8 
            9 enum
           10 {
           11         ClumpChunks        = 32*1024
           12 };
           13 
           14 struct IEBuck
           15 {
           16         u32int        head;                /* head of chain of chunks on the disk */
           17         u32int        used;                /* usage of the last chunk */
           18         u64int        total;                /* total number of bytes in this bucket */
           19         u8int        *buf;                /* chunk of entries for this bucket */
           20 };
           21 
           22 struct IEBucks
           23 {
           24         Part        *part;
           25         u64int        off;                /* offset for writing data in the partition */
           26         u32int        chunks;                /* total chunks written to fd */
           27         u64int        max;                /* max bytes entered in any one bucket */
           28         int        bits;                /* number of bits in initial bucket sort */
           29         int        nbucks;                /* 1 << bits, the number of buckets */
           30         u32int        size;                /* bytes in each of the buckets chunks */
           31         u32int        usable;                /* amount usable for IEntry data */
           32         u8int        *buf;                /* buffer for all chunks */
           33         u8int        *xbuf;
           34         IEBuck        *bucks;
           35 };
           36 
           37 #define        U32GET(p)        (((p)[0]<<24)|((p)[1]<<16)|((p)[2]<<8)|(p)[3])
           38 #define        U32PUT(p,v)        (p)[0]=(v)>>24;(p)[1]=(v)>>16;(p)[2]=(v)>>8;(p)[3]=(v)
           39 
           40 static IEBucks        *initiebucks(Part *part, int bits, u32int size);
           41 static int        flushiebuck(IEBucks *ib, int b, int reset);
           42 static int        flushiebucks(IEBucks *ib);
           43 static u32int        sortiebuck(IEBucks *ib, int b);
           44 static u64int        sortiebucks(IEBucks *ib);
           45 static int        sprayientry(IEBucks *ib, IEntry *ie);
           46 static u32int        readarenainfo(IEBucks *ib, Arena *arena, u64int a, Bloom *b);
           47 static u32int        readiebuck(IEBucks *ib, int b);
           48 static void        freeiebucks(IEBucks *ib);
           49 
           50 /*
           51  * build a sorted file with all IEntries which should be in ix.
           52  * assumes the arenas' directories are up to date.
           53  * reads each, converts the entries to index entries,
           54  * and sorts them.
           55  */
           56 u64int
           57 sortrawientries(Index *ix, Part *tmp, u64int *base, Bloom *bloom)
           58 {
           59         IEBucks *ib;
           60         u64int clumps, sorted;
           61         u32int n;
           62         int i, ok;
           63 
           64 /* ZZZ should allow configuration of bits, bucket size */
           65         ib = initiebucks(tmp, 8, 64*1024);
           66         if(ib == nil){
           67                 seterr(EOk, "can't create sorting buckets: %r");
           68                 return TWID64;
           69         }
           70         ok = 0;
           71         clumps = 0;
           72         fprint(2, "constructing entry list\n");
           73         for(i = 0; i < ix->narenas; i++){
           74                 n = readarenainfo(ib, ix->arenas[i], ix->amap[i].start, bloom);
           75                 if(n == TWID32){
           76                         ok = -1;
           77                         break;
           78                 }
           79                 clumps += n;
           80         }
           81         fprint(2, "sorting %lld entries\n", clumps);
           82         if(ok == 0){
           83                 sorted = sortiebucks(ib);
           84                 *base = (u64int)ib->chunks * ib->size;
           85                 if(sorted != clumps){
           86                         fprint(2, "sorting messed up: clumps=%lld sorted=%lld\n", clumps, sorted);
           87                         ok = -1;
           88                 }
           89         }
           90         freeiebucks(ib);
           91         if(ok < 0)
           92                 return TWID64;
           93         return clumps;
           94 }
           95 
           96 #define CHECK(cis)        if(((ulong*)cis)[-4] != 0xA110C09) xabort();
           97 
           98 void
           99 xabort(void)
          100 {
          101         int *x;
          102 
          103         x = 0;
          104         *x = 0;
          105 }
          106 
          107 /*
          108  * read in all of the arena's clump directory,
          109  * convert to IEntry format, and bucket sort based
          110  * on the first few bits.
          111  */
          112 static u32int
          113 readarenainfo(IEBucks *ib, Arena *arena, u64int a, Bloom *b)
          114 {
          115         IEntry ie;
          116         ClumpInfo *ci, *cis;
          117         u32int clump;
          118         int i, n, ok, nskip;
          119 
          120         if(arena->memstats.clumps)
          121                 fprint(2, "\tarena %s: %d entries\n", arena->name, arena->memstats.clumps);
          122         else
          123                 fprint(2, "[%s] ", arena->name);
          124 
          125         cis = MKN(ClumpInfo, ClumpChunks);
          126         ok = 0;
          127         nskip = 0;
          128         memset(&ie, 0, sizeof(IEntry));
          129         for(clump = 0; clump < arena->memstats.clumps; clump += n){
          130                 n = ClumpChunks;
          131                 if(n > arena->memstats.clumps - clump)
          132                         n = arena->memstats.clumps - clump;
          133                 if(readclumpinfos(arena, clump, cis, n) != n){
          134                         seterr(EOk, "arena directory read failed: %r");
          135                         ok = -1;
          136                         break;
          137                 }
          138 
          139                 for(i = 0; i < n; i++){
          140                         ci = &cis[i];
          141                         ie.ia.type = ci->type;
          142                         ie.ia.size = ci->uncsize;
          143                         ie.ia.addr = a;
          144                         a += ci->size + ClumpSize;
          145                         ie.ia.blocks = (ci->size + ClumpSize + (1 << ABlockLog) - 1) >> ABlockLog;
          146                         scorecp(ie.score, ci->score);
          147                         if(ci->type == VtCorruptType){
          148                                 if(0) print("! %V %22lld %3d %5d %3d\n",
          149                                         ie.score, ie.ia.addr, ie.ia.type, ie.ia.size, ie.ia.blocks);
          150                                 nskip++;
          151                         }else
          152                                 sprayientry(ib, &ie);
          153                         markbloomfilter(b, ie.score);
          154                 }
          155         }
          156         free(cis);
          157         if(ok < 0)
          158                 return TWID32;
          159         return clump - nskip;
          160 }
          161 
          162 /*
          163  * initialize the external bucket sorting data structures
          164  */
          165 static IEBucks*
          166 initiebucks(Part *part, int bits, u32int size)
          167 {
          168         IEBucks *ib;
          169         int i;
          170 
          171         ib = MKZ(IEBucks);
          172         if(ib == nil){
          173                 seterr(EOk, "out of memory");
          174                 return nil;
          175         }
          176         ib->bits = bits;
          177         ib->nbucks = 1 << bits;
          178         ib->size = size;
          179         ib->usable = (size - U32Size) / IEntrySize * IEntrySize;
          180         ib->bucks = MKNZ(IEBuck, ib->nbucks);
          181         if(ib->bucks == nil){
          182                 seterr(EOk, "out of memory allocation sorting buckets");
          183                 freeiebucks(ib);
          184                 return nil;
          185         }
          186         ib->xbuf = MKN(u8int, size * ((1 << bits)+1));
          187         ib->buf = (u8int*)(((uintptr)ib->xbuf+size-1)&~(uintptr)(size-1));
          188         if(ib->buf == nil){
          189                 seterr(EOk, "out of memory allocating sorting buckets' buffers");
          190                 freeiebucks(ib);
          191                 return nil;
          192         }
          193         for(i = 0; i < ib->nbucks; i++){
          194                 ib->bucks[i].head = TWID32;
          195                 ib->bucks[i].buf = &ib->buf[i * size];
          196         }
          197         ib->part = part;
          198         return ib;
          199 }
          200 
          201 static void
          202 freeiebucks(IEBucks *ib)
          203 {
          204         if(ib == nil)
          205                 return;
          206         free(ib->bucks);
          207         free(ib->buf);
          208         free(ib);
          209 }
          210 
          211 /*
          212  * initial sort: put the entry into the correct bucket
          213  */
          214 static int
          215 sprayientry(IEBucks *ib, IEntry *ie)
          216 {
          217         u32int n;
          218         int b;
          219 
          220         b = hashbits(ie->score, ib->bits);
          221         n = ib->bucks[b].used;
          222         if(n + IEntrySize > ib->usable){
          223                 /* should be flushed below, but if flush fails, this can happen */
          224                 seterr(EOk, "out of space in bucket");
          225                 return -1;
          226         }
          227         packientry(ie, &ib->bucks[b].buf[n]);
          228         n += IEntrySize;
          229         ib->bucks[b].used = n;
          230         if(n + IEntrySize <= ib->usable)
          231                 return 0;
          232         return flushiebuck(ib, b, 1);
          233 }
          234 
          235 /*
          236  * finish sorting:
          237  * for each bucket, read it in and sort it
          238  * write out the the final file
          239  */
          240 static u64int
          241 sortiebucks(IEBucks *ib)
          242 {
          243         u64int tot;
          244         u32int n;
          245         int i;
          246 
          247         if(flushiebucks(ib) < 0)
          248                 return TWID64;
          249         for(i = 0; i < ib->nbucks; i++)
          250                 ib->bucks[i].buf = nil;
          251         ib->off = (u64int)ib->chunks * ib->size;
          252         free(ib->xbuf);
          253 
          254         ib->buf = MKN(u8int, ib->max + U32Size);
          255         if(ib->buf == nil){
          256                 seterr(EOk, "out of memory allocating final sorting buffer; try more buckets");
          257                 return TWID64;
          258         }
          259         tot = 0;
          260         for(i = 0; i < ib->nbucks; i++){
          261                 n = sortiebuck(ib, i);
          262                 if(n == TWID32)
          263                         return TWID64;
          264                 if(n != ib->bucks[i].total/IEntrySize)
          265                         fprint(2, "bucket %d changed count %d => %d\n",
          266                                 i, (int)(ib->bucks[i].total/IEntrySize), n);
          267                 tot += n;
          268         }
          269         return tot;
          270 }
          271 
          272 /*
          273  * sort from bucket b of ib into the output file to
          274  */
          275 static u32int
          276 sortiebuck(IEBucks *ib, int b)
          277 {
          278         u32int n;
          279 
          280         n = readiebuck(ib, b);
          281         if(n == TWID32)
          282                 return TWID32;
          283         qsort(ib->buf, n, IEntrySize, ientrycmp);
          284         if(writepart(ib->part, ib->off, ib->buf, n*IEntrySize) < 0){
          285                 seterr(EOk, "can't write sorted bucket: %r");
          286                 return TWID32;
          287         }
          288         ib->off += n * IEntrySize;
          289         return n;
          290 }
          291 
          292 /*
          293  * write out a single bucket
          294  */
          295 static int
          296 flushiebuck(IEBucks *ib, int b, int reset)
          297 {
          298         u32int n;
          299 
          300         if(ib->bucks[b].used == 0)
          301                 return 0;
          302         n = ib->bucks[b].used;
          303         U32PUT(&ib->bucks[b].buf[n], ib->bucks[b].head);
          304         n += U32Size;
          305         USED(n);
          306         if(writepart(ib->part, (u64int)ib->chunks * ib->size, ib->bucks[b].buf, ib->size) < 0){
          307                 seterr(EOk, "can't write sorting bucket to file: %r");
          308 xabort();
          309                 return -1;
          310         }
          311         ib->bucks[b].head = ib->chunks++;
          312         ib->bucks[b].total += ib->bucks[b].used;
          313         if(reset)
          314                 ib->bucks[b].used = 0;
          315         return 0;
          316 }
          317 
          318 /*
          319  * write out all of the buckets, and compute
          320  * the maximum size of any bucket
          321  */
          322 static int
          323 flushiebucks(IEBucks *ib)
          324 {
          325         int i;
          326 
          327         for(i = 0; i < ib->nbucks; i++){
          328                 if(flushiebuck(ib, i, 0) < 0)
          329                         return -1;
          330                 if(ib->bucks[i].total > ib->max)
          331                         ib->max = ib->bucks[i].total;
          332         }
          333         return 0;
          334 }
          335 
          336 /*
          337  * read in the chained buffers for bucket b,
          338  * and return it's total number of IEntries
          339  */
          340 static u32int
          341 readiebuck(IEBucks *ib, int b)
          342 {
          343         u32int head, m, n;
          344 
          345         head = ib->bucks[b].head;
          346         n = 0;
          347         m = ib->bucks[b].used;
          348         if(m == 0)
          349                 m = ib->usable;
          350         if(0) if(ib->bucks[b].total)
          351                 fprint(2, "\tbucket %d: %lld entries\n", b, ib->bucks[b].total/IEntrySize);
          352         while(head != TWID32){
          353                 if(readpart(ib->part, (u64int)head * ib->size, &ib->buf[n], m+U32Size) < 0){
          354                         seterr(EOk, "can't read index sort bucket: %r");
          355                         return TWID32;
          356                 }
          357                 n += m;
          358                 head = U32GET(&ib->buf[n]);
          359                 m = ib->usable;
          360         }
          361         if(n != ib->bucks[b].total)
          362                 fprint(2, "\tbucket %d: expected %d entries, got %d\n",
          363                         b, (int)ib->bucks[b].total/IEntrySize, n/IEntrySize);
          364         return n / IEntrySize;
          365 }