Skip to content

Commit

Permalink
make disk manager actually delete page (#785)
Browse files Browse the repository at this point in the history
  • Loading branch information
xx01cyx authored and xueqili02 committed Jan 15, 2025
1 parent 01a64ff commit 1887f6e
Show file tree
Hide file tree
Showing 6 changed files with 298 additions and 254 deletions.
80 changes: 37 additions & 43 deletions src/include/storage/disk/disk_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,33 @@
#include <future> // NOLINT
#include <mutex> // NOLINT
#include <string>
#include <unordered_map>
#include <vector>

#include "common/config.h"
#include "common/logger.h"

namespace bustub {

/**
* DiskManager takes care of the allocation and deallocation of pages within a database. It performs the reading and
* writing of pages to and from disk, providing a logical file layer within the context of a database management system.
*
* DiskManager uses lazy allocation, meaning that it only allocates space on disk when it is first accessed. It
* maintains a mapping of page ids to their corresponding offsets in the database file. When a page is deleted, it is
* marked as free and can be reused by future allocations.
*/
class DiskManager {
public:
/**
* Creates a new disk manager that writes to the specified database file.
* @param db_file the file name of the database file to write to
*/
explicit DiskManager(const std::filesystem::path &db_file);

/** FOR TEST / LEADERBOARD ONLY, used by DiskManagerMemory */
DiskManager() = default;

virtual ~DiskManager() = default;

/**
* Shut down the disk manager and close all the file resources.
*/
void ShutDown();

/**
* @brief Increases the size of the database file.
*
* This function works like a dynamic array, where the capacity is doubled until all pages can fit.
*
* @param pages The number of pages the caller wants the file used for storage to support.
*/
virtual void IncreaseDiskSpace(size_t pages);

/**
* Write a page to the database file.
* @param page_id id of the page
Expand All @@ -74,32 +65,16 @@ class DiskManager {
*/
virtual void DeletePage(page_id_t page_id);

/**
* Flush the entire log buffer into disk.
* @param log_data raw log data
* @param size size of log entry
*/
void WriteLog(char *log_data, int size);

/**
* Read a log entry from the log file.
* @param[out] log_data output buffer
* @param size size of the log entry
* @param offset offset of the log entry in the file
* @return true if the read was successful, false otherwise
*/
auto ReadLog(char *log_data, int size, int offset) -> bool;

/** @return the number of disk flushes */
auto GetNumFlushes() const -> int;

/** @return true iff the in-memory content has not been flushed yet */
auto GetFlushState() const -> bool;

/** @return the number of disk writes */
auto GetNumWrites() const -> int;

/** @return the number of deletions */
auto GetNumDeletes() const -> int;

/**
Expand All @@ -112,28 +87,47 @@ class DiskManager {
inline auto HasFlushLogFuture() -> bool { return flush_log_f_ != nullptr; }

/** @brief returns the log file name */
inline auto GetLogFileName() const -> std::filesystem::path { return log_name_; }
inline auto GetLogFileName() const -> std::filesystem::path { return log_file_name_; }

/** @brief returns the size of disk space in use */
auto GetDbFileSize() -> size_t {
auto file_size = GetFileSize(db_file_name_);
if (file_size < 0) {
LOG_DEBUG("I/O error: Fail to get db file size");
return -1;
}
return static_cast<size_t>(file_size);
}

protected:
int num_flushes_{0};
int num_writes_{0};
int num_deletes_{0};

/** @brief The capacity of the file used for storage on disk. */
size_t page_capacity_{DEFAULT_DB_IO_SIZE};

private:
auto GetFileSize(const std::string &file_name) -> int;

auto AllocatePage() -> size_t;

// stream to write log file
std::fstream log_io_;
std::filesystem::path log_name_;
std::filesystem::path log_file_name_;
// stream to write db file
std::fstream db_io_;
std::filesystem::path file_name_;
int num_flushes_{0};
int num_writes_{0};
int num_deletes_{0};
std::filesystem::path db_file_name_;

// Records the offset of each page in the db file.
std::unordered_map<page_id_t, size_t> pages_;
// Records the free slots in the db file if pages are deleted, indicated by offset.
std::vector<size_t> free_slots_;

bool flush_log_{false};
std::future<void> *flush_log_f_{nullptr};
// With multiple buffer pool instances, need to protect file access
std::mutex db_io_latch_;

/** @brief The number of pages allocated to the DBMS on disk. */
size_t pages_{0};
/** @brief The capacity of the file used for storage on disk. */
size_t page_capacity_{DEFAULT_DB_IO_SIZE};
};

} // namespace bustub
157 changes: 13 additions & 144 deletions src/include/storage/disk/disk_manager_memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,32 +40,15 @@ namespace bustub {
*/
class DiskManagerMemory : public DiskManager {
public:
explicit DiskManagerMemory(size_t pages);
explicit DiskManagerMemory(size_t capacity);

~DiskManagerMemory() override { delete[] memory_; }

/**
* This function should increase the disk space, but since we have a fixed amount of memory we just check that the
* pages are in bounds.
*/
void IncreaseDiskSpace(size_t pages) override;

/**
* Write a page to the database file.
* @param page_id id of the page
* @param page_data raw page data
*/
void WritePage(page_id_t page_id, const char *page_data) override;

/**
* Read a page from the database file.
* @param page_id id of the page
* @param[out] page_data output buffer
*/
void ReadPage(page_id_t page_id, char *page_data) override;

private:
size_t pages_;
char *memory_;
};

Expand All @@ -75,133 +58,21 @@ class DiskManagerMemory : public DiskManager {
*/
class DiskManagerUnlimitedMemory : public DiskManager {
public:
DiskManagerUnlimitedMemory() {
std::scoped_lock l(mutex_);
while (data_.size() < pages_ + 1) {
data_.push_back(std::make_shared<ProtectedPage>());
}
std::fill(recent_access_.begin(), recent_access_.end(), -1);
}

/**
* This function should increase the disk space, but since this is memory we just resize the vector.
*/
void IncreaseDiskSpace(size_t pages) override {
std::scoped_lock l(mutex_);

if (pages < pages_) {
return;
}

while (data_.size() < pages + 1) {
data_.push_back(std::make_shared<ProtectedPage>());
}

pages_ = pages;
}

/**
* Write a page to the database file.
* @param page_id id of the page
* @param page_data raw page data
*/
void WritePage(page_id_t page_id, const char *page_data) override {
ProcessLatency(page_id);

std::unique_lock<std::mutex> l(mutex_);
if (!thread_id_.has_value()) {
thread_id_ = std::this_thread::get_id();
}
if (page_id >= static_cast<int>(data_.size())) {
data_.resize(page_id + 1);
}
if (data_[page_id] == nullptr) {
data_[page_id] = std::make_shared<ProtectedPage>();
}
std::shared_ptr<ProtectedPage> ptr = data_[page_id];
std::unique_lock<std::shared_mutex> l_page(ptr->second);
l.unlock();

memcpy(ptr->first.data(), page_data, BUSTUB_PAGE_SIZE);
num_writes_ += 1;

PostProcessLatency(page_id);
}

/**
* Read a page from the database file.
* @param page_id id of the page
* @param[out] page_data output buffer
*/
void ReadPage(page_id_t page_id, char *page_data) override {
ProcessLatency(page_id);

std::unique_lock<std::mutex> l(mutex_);
if (!thread_id_.has_value()) {
thread_id_ = std::this_thread::get_id();
}
if (page_id >= static_cast<int>(data_.size()) || page_id < 0) {
fmt::println(stderr, "page {} not in range", page_id);
std::terminate();
return;
}
if (data_[page_id] == nullptr) {
fmt::println(stderr, "page {} not exist", page_id, pages_);
std::terminate();
return;
}
std::shared_ptr<ProtectedPage> ptr = data_[page_id];
std::shared_lock<std::shared_mutex> l_page(ptr->second);
l.unlock();

memcpy(page_data, ptr->first.data(), BUSTUB_PAGE_SIZE);

PostProcessLatency(page_id);
}

/**
* Delete a page from the database file. Reclaim the disk space.
* Note: This is a no-op for now without a more complex data structure to
* track deallocated pages.
* @param page_id id of the page
*/
void DeletePage(page_id_t page_id) override { num_deletes_ += 1; }

void ProcessLatency(page_id_t page_id) {
uint64_t sleep_micro_sec = 1000; // for random access, 1ms latency
if (latency_simulator_enabled_) {
std::unique_lock<std::mutex> lck(latency_processor_mutex_);
for (auto &recent_page_id : recent_access_) {
if ((recent_page_id & (~0x3)) == (page_id & (~0x3))) {
sleep_micro_sec = 100; // for access in the same "block", 0.1ms latency
break;
}
if (page_id >= recent_page_id && page_id <= recent_page_id + 3) {
sleep_micro_sec = 100; // for sequential access, 0.1ms latency
break;
}
}
lck.unlock();
std::this_thread::sleep_for(std::chrono::microseconds(sleep_micro_sec));
}
}

void PostProcessLatency(page_id_t page_id) {
if (latency_simulator_enabled_) {
std::scoped_lock<std::mutex> lck(latency_processor_mutex_);
recent_access_[access_ptr_] = page_id;
access_ptr_ = (access_ptr_ + 1) % recent_access_.size();
}
}
DiskManagerUnlimitedMemory();

void WritePage(page_id_t page_id, const char *page_data) override;

void ReadPage(page_id_t page_id, char *page_data) override;

void DeletePage(page_id_t page_id) override;

void ProcessLatency(page_id_t page_id);

void PostProcessLatency(page_id_t page_id);

void EnableLatencySimulator(bool enabled) { latency_simulator_enabled_ = enabled; }

auto GetLastReadThreadAndClear() -> std::optional<std::thread::id> {
std::unique_lock<std::mutex> lck(mutex_);
auto t = thread_id_;
thread_id_ = std::nullopt;
return t;
}
auto GetLastReadThreadAndClear() -> std::optional<std::thread::id>;

private:
bool latency_simulator_enabled_{false};
Expand All @@ -216,8 +87,6 @@ class DiskManagerUnlimitedMemory : public DiskManager {
std::mutex mutex_;
std::optional<std::thread::id> thread_id_;
std::vector<std::shared_ptr<ProtectedPage>> data_;

size_t pages_{DEFAULT_DB_IO_SIZE};
};

} // namespace bustub
9 changes: 0 additions & 9 deletions src/include/storage/disk/disk_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,6 @@ class DiskScheduler {
*/
auto CreatePromise() -> DiskSchedulerPromise { return {}; };

/**
* @brief Increases the size of the database file to fit the specified number of pages.
*
* This function works like a dynamic array, where the capacity is doubled until all pages can fit.
*
* @param pages The number of pages the caller wants the file used for storage to support.
*/
void IncreaseDiskSpace(size_t pages) { disk_manager_->IncreaseDiskSpace(pages); }

/**
* @brief Deallocates a page on disk.
*
Expand Down
Loading

0 comments on commit 1887f6e

Please sign in to comment.