Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 186 additions & 66 deletions src/embedDB/embedDB.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ int8_t embedDBInit(embedDBState *state, size_t indexMaxError) {
return -1;
}

if (state->numDataPages < (EMBEDDB_USING_RECORD_LEVEL_CONSISTENCY(state->parameters) ? 4 : 2) * state->eraseSizeInPages) {
if (state->numDataPages < 4 * state->eraseSizeInPages) {
#ifdef PRINT_ERRORS
printf("ERROR: The minimum number of data pages is twice the eraseSizeInPages or 4 times the eraseSizeInPages if using record-level consistency.\n");
printf("ERROR: The minimum number of data pages is 4 times the eraseSizeInPages.\n");
#endif
return -1;
}
Expand All @@ -167,6 +167,14 @@ int8_t embedDBInit(embedDBState *state, size_t indexMaxError) {
#endif
return -1;
}

if (state->numVarPages < 4 * state->eraseSizeInPages) {
#ifdef PRINT_ERRORS
printf("ERROR: The minimum number of variable data pages is 4 times the eraseSizeInPages.\n");
#endif
return -1;
}

state->recordSize += 4;
}

Expand All @@ -183,6 +191,21 @@ int8_t embedDBInit(embedDBState *state, size_t indexMaxError) {
#endif
return -1;
}

if (state->numIndexPages < 2 * state->eraseSizeInPages) {
#ifdef PRINT_ERRORS
printf("ERROR: The minimum number of index pages is 4 times the eraseSizeInPages.\n");
#endif
return -1;
}

if (!EMBEDDB_USING_BMAP(state->parameters)) {
#ifdef PRINT_ERRORS
printf("WARNING: Using the data index requires the bitmap to be enabled. This will automatically be enabled.\n");
#endif
state->parameters |= EMBEDDB_USE_BMAP;
}

state->headerSize += state->bitmapSize;
}

Expand Down Expand Up @@ -599,8 +622,8 @@ void embedDBInitSplineFromFile(embedDBState *state) {
int8_t embedDBInitIndex(embedDBState *state) {
/* Setup index file. */

/* 4 for id, 2 for count, 2 unused, 4 for minKey (pageId), 4 for maxKey (pageId) */
state->maxIdxRecordsPerPage = (state->pageSize - 16) / state->bitmapSize;
/* 4 for id, 2 for count, 2 unused, 4 for minKey (pageId), 4 that are currently empty */
state->maxIdxRecordsPerPage = (state->pageSize - EMBEDDB_IDX_HEADER_SIZE) / state->bitmapSize;

/* Allocate third page of buffer as index output page */
initBufferPage(state, EMBEDDB_INDEX_WRITE_BUFFER);
Expand Down Expand Up @@ -657,38 +680,119 @@ int8_t embedDBInitIndexFromFile(embedDBState *state) {
id_t logicalIndexPageId = 0;
id_t maxLogicaIndexPageId = 0;
id_t physicalIndexPageId = 0;
id_t count = 0;
count_t blockSize = state->eraseSizeInPages;
bool validData = false;
bool hasData = false;
void *buffer = (int8_t *)state->buffer + state->pageSize * EMBEDDB_INDEX_READ_BUFFER;

/* This will become zero if there is no more to read */
int8_t moreToRead = !(readIndexPage(state, physicalIndexPageId));

bool haveWrappedInMemory = false;
int count = 0;
void *buffer = (int8_t *)state->buffer + state->pageSize * EMBEDDB_INDEX_READ_BUFFER;

while (moreToRead && count < state->numIndexPages) {
/* this handles the case where the first page may have been erased, so has junk data and we actually need to start from the second page */
uint32_t i = 0;
count_t indexCount = 0;
while (moreToRead && i < 2) {
memcpy(&logicalIndexPageId, buffer, sizeof(id_t));
if (count == 0 || logicalIndexPageId == maxLogicaIndexPageId + 1) {
validData = logicalIndexPageId % state->numIndexPages == count;
indexCount = EMBEDDB_GET_COUNT(buffer);
if (validData && indexCount > 0 && indexCount < state->maxIdxRecordsPerPage + 1) {
hasData = true;
maxLogicaIndexPageId = logicalIndexPageId;
physicalIndexPageId++;
moreToRead = !(readIndexPage(state, physicalIndexPageId));
count++;
i = 2;
} else {
haveWrappedInMemory = logicalIndexPageId == maxLogicaIndexPageId - state->numIndexPages + 1;
break;
physicalIndexPageId += blockSize;
count += blockSize;
}
moreToRead = !(readIndexPage(state, physicalIndexPageId));
i++;
}

if (count == 0)
return 0;
/**
* If we have index data, we need to scan through the file for recovery.
* If there is no data in the file, there still may be bitmaps written to storage on the datafile that we can recover.
**/
if (hasData) {
while (moreToRead && count < state->numIndexPages) {
memcpy(&logicalIndexPageId, buffer, sizeof(id_t));
validData = logicalIndexPageId % state->numIndexPages == count;
if (validData && logicalIndexPageId == maxLogicaIndexPageId + 1) {
maxLogicaIndexPageId = logicalIndexPageId;
physicalIndexPageId++;
moreToRead = !(readIndexPage(state, physicalIndexPageId));
count++;
} else {
break;
}
}

state->nextIdxPageId = maxLogicaIndexPageId + 1;
id_t physicalPageIDOfSmallestData = 0;
if (haveWrappedInMemory) {
physicalPageIDOfSmallestData = logicalIndexPageId % state->numIndexPages;
/*
* Now we need to find where the page with the smallest key that is still valid.
* The default case is we have not wrapped and the page number for the physical page with the smallest key is 0.
*/
id_t physicalPageIDOfSmallestData = 0;

/* check if data exists at this location */
if (moreToRead && count < state->numIndexPages) {
/* find where the next block boundary is */
id_t pagesToBlockBoundary = blockSize - (count % blockSize);

/* go to the next block boundary */
physicalIndexPageId = (physicalIndexPageId + pagesToBlockBoundary) % state->numIndexPages;
moreToRead = !(readIndexPage(state, physicalIndexPageId));

/* there should have been more to read becuase the file should not be empty at this point if it was not empty at the previous block */
if (!moreToRead) {
return -1;
}

/* check if data is valid or if it is junk */
memcpy(&logicalIndexPageId, buffer, sizeof(id_t));
validData = logicalIndexPageId % state->numIndexPages == physicalIndexPageId;

/* this means we have wrapped and our start is actually here */
if (validData) {
physicalPageIDOfSmallestData = physicalIndexPageId;
}
}

state->nextIdxPageId = maxLogicaIndexPageId + 1;
readIndexPage(state, physicalPageIDOfSmallestData);
memcpy(&(state->minIndexPageId), buffer, sizeof(id_t));
state->numAvailIndexPages = state->numIndexPages + state->minIndexPageId - maxLogicaIndexPageId - 1;
}

/* Compute how many bitmaps we should have and how many we do have */
id_t expectedNumberOfIndices = state->nextDataPageId;
id_t numDataIndicies = state->nextIdxPageId * state->maxIdxRecordsPerPage;
id_t numPagesToRead = expectedNumberOfIndices - numDataIndicies;

if (numPagesToRead > 0) {
id_t pageToRead = state->nextDataPageId - numPagesToRead;
int8_t readResult = 0;
void *dataPageBuffer = state->buffer + state->pageSize * EMBEDDB_DATA_READ_BUFFER;
void *indexWriteBuffer = state->buffer + state->pageSize * EMBEDDB_INDEX_WRITE_BUFFER;

/* Add page id to minimum value spot in page */
memcpy((int8_t *)indexWriteBuffer + 8, &pageToRead, sizeof(id_t));

/* Put all bitmaps that were written to storgae on the data page but not yet on the index page into the buffer */
for (id_t i = 0; i < numPagesToRead; i++) {
readResult = readPage(state, pageToRead % state->numDataPages);
if (readResult == -1) {
#ifdef PRINT_ERRORS
printf("Error: Unable to read page from data file!\n");
#endif
return -1;
}
void *bitmap = EMBEDDB_GET_BITMAP(dataPageBuffer);
EMBEDDB_INC_COUNT(indexWriteBuffer);
memcpy((int8_t *)indexWriteBuffer + EMBEDDB_IDX_HEADER_SIZE + state->bitmapSize * i, bitmap, state->bitmapSize);
pageToRead++;
}
}
readIndexPage(state, physicalPageIDOfSmallestData);
memcpy(&(state->minIndexPageId), buffer, sizeof(id_t));
state->numAvailIndexPages = state->numIndexPages + state->minIndexPageId - maxLogicaIndexPageId - 1;

return 0;
}
Expand Down Expand Up @@ -994,6 +1098,40 @@ void indexPage(embedDBState *state, uint32_t pageNumber) {
}
}

/**
* @brief Adds an entry into the data index for a given page.
* @param state embedDB algorithm state structure
* @param pageNumber Page number to index
* @return Return 0 if success. Non-zero value if error.
*/
int8_t indexData(embedDBState *state, id_t pageNumber) {
void *buf = (int8_t *)state->buffer + state->pageSize * (EMBEDDB_INDEX_WRITE_BUFFER);
count_t idxcount = EMBEDDB_GET_COUNT(buf);
if (idxcount >= state->maxIdxRecordsPerPage) {
/* Save index page */
id_t indexPageNumber = writeIndexPage(state, buf);
if (indexPageNumber == -1) {
#ifdef PRINT_ERRORS
printf("ERROR: failed to write index page to storage");
#endif
return -1;
}

idxcount = 0;
initBufferPage(state, EMBEDDB_INDEX_WRITE_BUFFER);

/* Add page id to minimum value spot in page */
memcpy((int8_t *)buf + 8, &pageNumber, sizeof(id_t));
}

EMBEDDB_INC_COUNT(buf);

/* Copy record onto index page */
void *bm = EMBEDDB_GET_BITMAP(state->buffer);
memcpy((void *)((int8_t *)buf + EMBEDDB_IDX_HEADER_SIZE + state->bitmapSize * idxcount), bm, state->bitmapSize);
return 0;
}

/**
* @brief Puts a given key, data pair into structure.
* @param state embedDB algorithm state structure
Expand Down Expand Up @@ -1025,35 +1163,29 @@ int8_t embedDBPut(embedDBState *state, void *key, void *data) {
/* Write current page if full */
bool wrotePage = false;
if (count >= state->maxRecordsPerPage) {
// As the first buffer is the data write buffer, no manipulation is required
id_t pageNum = writePage(state, state->buffer);
void *dataWriteBuffer = state->buffer + state->pageSize * EMBEDDB_DATA_WRITE_BUFFER;
id_t pageNum = writePage(state, dataWriteBuffer);
if (pageNum == -1) {
#ifdef PRINT_ERRORS
printf("ERROR: Unable to write data page when buffer is full. \n");
#endif
return 2;
}

indexPage(state, pageNum);

/* Save record in index file */
if (state->indexFile != NULL) {
void *buf = (int8_t *)state->buffer + state->pageSize * (EMBEDDB_INDEX_WRITE_BUFFER);
count_t idxcount = EMBEDDB_GET_COUNT(buf);
if (idxcount >= state->maxIdxRecordsPerPage) {
/* Save index page */
writeIndexPage(state, buf);

idxcount = 0;
initBufferPage(state, EMBEDDB_INDEX_WRITE_BUFFER);

/* Add page id to minimum value spot in page */
id_t *ptr = (id_t *)((int8_t *)buf + 8);
*ptr = pageNum;
if (EMBEDDB_USING_INDEX(state->parameters)) {
int8_t indexResult = indexData(state, pageNum);
if (pageNum == -1) {
#ifdef PRINT_ERRORS
printf("ERROR: Unable to index data. \n");
#endif
return 3;
}

EMBEDDB_INC_COUNT(buf);

/* Copy record onto index page */
void *bm = EMBEDDB_GET_BITMAP(state->buffer);
memcpy((void *)((int8_t *)buf + EMBEDDB_IDX_HEADER_SIZE + state->bitmapSize * idxcount), bm, state->bitmapSize);
}

updateMaxiumError(state, state->buffer);
updateMaxiumError(state, dataWriteBuffer);

count = 0;
initBufferPage(state, 0);
Expand Down Expand Up @@ -1681,28 +1813,9 @@ int8_t embedDBFlush(embedDBState *state) {
state->fileInterface->flush(state->dataFile);

indexPage(state, pageNum);

int8_t indexResult = 0;
if (EMBEDDB_USING_INDEX(state->parameters)) {
void *buf = (int8_t *)state->buffer + state->pageSize * (EMBEDDB_INDEX_WRITE_BUFFER);
count_t idxcount = EMBEDDB_GET_COUNT(buf);
EMBEDDB_INC_COUNT(buf);

/* Copy record onto index page */
void *bm = EMBEDDB_GET_BITMAP(state->buffer);
memcpy((void *)((int8_t *)buf + EMBEDDB_IDX_HEADER_SIZE + state->bitmapSize * idxcount), bm, state->bitmapSize);

id_t writeResult = writeIndexPage(state, buf);
if (writeResult == -1) {
#ifdef PRINT_ERRORS
printf("Failed to write index page during embedDBFlush.");
#endif
return -1;
}

state->fileInterface->flush(state->indexFile);

/* Reinitialize buffer */
initBufferPage(state, EMBEDDB_INDEX_WRITE_BUFFER);
indexResult = indexData(state, pageNum);
}

/* Reinitialize buffer */
Expand All @@ -1718,6 +1831,14 @@ int8_t embedDBFlush(embedDBState *state) {
return -1;
}
}

if (indexResult == -1) {
#ifdef PRINT_ERRORS
printf("ERROR: Failed to index data when flushing data to storage.");
#endif
return -1;
}

return 0;
}

Expand Down Expand Up @@ -2047,7 +2168,6 @@ int8_t writeTemporaryPage(embedDBState *state, void *buffer) {
}

/* Setup page number in header */
/* TODO: Maybe talk to Ramon about optimizing this */
memcpy(buffer, &(state->nextDataPageId), sizeof(id_t));

/* Wrap if needed */
Expand Down
2 changes: 1 addition & 1 deletion test/test_buffered_read/test_buffered_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ embedDBState* init_state() {

// address level parameters
state->numDataPages = 256;
state->numIndexPages = 8;
state->numIndexPages = 16;
state->eraseSizeInPages = 4;

// configure file interface
Expand Down
Loading
Loading