From e54a79ca92fa02464b130217cf949279e35f3a7f Mon Sep 17 00:00:00 2001 From: anthony4m Date: Wed, 5 Feb 2025 18:38:04 +0000 Subject: [PATCH 01/16] Fix: Correct buffer manager policy call (123) Corrected the way the buffer manager policy is called in `logmgr.go`. The change ensures the correct method is used to allocate buffers for log blocks. --- log/logmgr.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/log/logmgr.go b/log/logmgr.go index 3076373..b66d3b2 100644 --- a/log/logmgr.go +++ b/log/logmgr.go @@ -69,7 +69,7 @@ func NewLogMgr(fm *kfile.FileMgr, bm *buffer.BufferMgr, logFile string) (*LogMgr return nil, &Error{Op: "new", Err: fmt.Errorf("failed to append initial block: %w", err)} } // Inform the buffer manager that this block is in use. - lm.bm.Policy.AllocateBufferForBlock(*lm.currentBlock) + lm.bm.Policy().AllocateBufferForBlock(*lm.currentBlock) } else { // Otherwise, set the current block as the last block. lm.currentBlock = kfile.NewBlockId(logFile, lm.logsize-1) @@ -164,7 +164,7 @@ func (lm *LogMgr) Append(logrec []byte) (int, []byte, error) { return 0, nil, &Error{Op: "append", Err: fmt.Errorf("failed to append new block: %w", err)} } // You may want to inform the buffer manager about the new block. - lm.bm.Policy.AllocateBufferForBlock(*lm.currentBlock) + lm.bm.Policy().AllocateBufferForBlock(*lm.currentBlock) // Try inserting again into the new log page. logPage = lm.logBuffer.Contents() if err = logPage.InsertCell(cell); err != nil { From 3674365110ad3e40976453ec1211281dcfc5f6b5 Mon Sep 17 00:00:00 2001 From: anthony4m Date: Wed, 5 Feb 2025 18:38:22 +0000 Subject: [PATCH 02/16] feat: Improve transaction manager (transaction) Refactor transaction manager to improve code style and add functionality. Added error handling and improved concurrency control. Added FindCell and InsertCell functions. --- transaction/transactionMgr.go | 58 +++++++++++++++++++++++++---------- 1 file changed, 41 insertions(+), 17 deletions(-) diff --git a/transaction/transactionMgr.go b/transaction/transactionMgr.go index c37389a..54bfe0d 100644 --- a/transaction/transactionMgr.go +++ b/transaction/transactionMgr.go @@ -20,10 +20,10 @@ type TransactionMgr struct { func NewTransaction(fm *kfile.FileMgr, lm *log.LogMgr, bm *buffer.BufferMgr) *TransactionMgr { tx := &TransactionMgr{ - fm: fm, - bm: bm, - txtnum: nextTxNumber(), + fm: fm, + bm: bm, } + tx.nextTxNum = tx.nextTxNumber() tx.rm = NewRecoveryMgr(tx, tx.txtnum, lm, bm) tx.cm = NewConcurrencyMgr() tx.bufferList = NewBufferList(bm) @@ -54,28 +54,52 @@ func (t *TransactionMgr) UnPin(blk *kfile.BlockId) { bufferList.UnPin(blk) } -func (t *TransactionMgr) Size(filename string)int { - dummyblk := kfile.NewBlockId(filename, t.EndOfFile); - t.cm.sLock(dummyblk); - fileLength,err := t.fm.LengthLocked(filename); - if err != nil{ +func (t *TransactionMgr) Size(filename string) int { + dummyblk := kfile.NewBlockId(filename, t.EndOfFile) + t.cm.sLock(dummyblk) + fileLength, err := t.fm.LengthLocked(filename) + if err != nil { return 0 } - return fileLength; + return fileLength } func (t *TransactionMgr) append(filename string) *kfile.BlockId { - BlockId dummyblk = new BlockId(filename, END_OF_FILE); - concurMgr.xLock(dummyblk); - return fm.append(filename); + dummyblk := kfile.NewBlockId(filename, t.EndOfFile) + t.cm.xLock(dummyblk) + blk, err := t.fm.Append(filename) + if err != nil { + return nil + } + return blk } -func (t *TransactionMgr) blockSize() int { - return t.fm.BlockSize(); +func (t *TransactionMgr) blockSize() int { + return t.fm.BlockSize() } -func (t *TransactionMgr) AvailableBuffs() int { - return t.bm.Available(); +func (t *TransactionMgr) AvailableBuffs() int { + return t.bm.Available() } func (t *TransactionMgr) nextTxNumber() int64 { return atomic.AddInt64(&t.nextTxNum, 1) -} \ No newline at end of file +} + +func (t *TransactionMgr) FindCell(blk kfile.BlockId, key []byte) *kfile.Cell { + t.cm.sLock(blk) + buff := t.bufferList.getBuffer(blk) + return buff.Contents().FindCell(key) +} + +func (t *TransactionMgr) InsertCell(blk kfile.BlockId, key []byte, val any, okToLog bool) { + t.cm.xLock(blk) + buff := t.bufferList.getBuffer(blk) + lsn := -1 + if okToLog { + lsn = t.rm.setValue(buff, key, val) + } + cellKey := t.txtnum + cell := kfile.NewKVCell(cellKey) + p := buff.Contents() + p.InsertCell(cell) + buff.setModified(txnum, lsn) +} From 67c15e39c50fb17a883c5566d25bdf50b88a255b Mon Sep 17 00:00:00 2001 From: anthony4m Date: Wed, 5 Feb 2025 19:28:43 +0000 Subject: [PATCH 03/16] Refactor: Improve transaction manager and buffer list (1) Improve the transaction manager and buffer list by refactoring the Pin, UnPin, and Buffer functions. Add error handling and improve the overall design for better efficiency and clarity. Also, simplify the FindCell and InsertCell functions in transactionMgr.go. --- transaction/bufferlist.go | 55 +++++++++++++++++++++++++++++++++-- transaction/transactionMgr.go | 20 ++++++++----- 2 files changed, 64 insertions(+), 11 deletions(-) diff --git a/transaction/bufferlist.go b/transaction/bufferlist.go index c1fd2a7..0fe77e2 100644 --- a/transaction/bufferlist.go +++ b/transaction/bufferlist.go @@ -1,10 +1,59 @@ package transaction -import "ultraSQL/buffer" +import ( + "fmt" + "ultraSQL/buffer" + "ultraSQL/kfile" +) type BufferList struct { + bm *buffer.BufferMgr + buffers map[kfile.BlockId]*buffer.Buffer } -func NewBufferList(*buffer.BufferMgr) *BufferList { - return &BufferList{} +func NewBufferList(bm *buffer.BufferMgr) *BufferList { + return &BufferList{ + bm: bm, + buffers: make(map[kfile.BlockId]*buffer.Buffer), + } +} + +// Buffer retrieves a pinned Buffer (if any) for the given block +func (bl *BufferList) Buffer(blk kfile.BlockId) *buffer.Buffer { + return bl.buffers[blk] +} + +// Pin pins the specified block if it isn't already pinned in this BufferList +func (bl *BufferList) Pin(blk kfile.BlockId) error { + if _, exists := bl.buffers[blk]; exists { + // already pinned in this transaction + return nil + } + buff, err := bl.bm.Pin(&blk) + if err != nil { + return fmt.Errorf("failed to pin block %v: %w", blk, err) + } + bl.buffers[blk] = buff + return nil +} + +// Unpin unpins the specified block +func (bl *BufferList) Unpin(blk kfile.BlockId) error { + buff, exists := bl.buffers[blk] + if !exists { + // not pinned in this transaction + return nil + } + bl.bm.Unpin(buff) + delete(bl.buffers, blk) + return nil +} + +// UnpinAll unpins all blocks pinned by this BufferList +func (bl *BufferList) UnpinAll() { + for _, buff := range bl.buffers { + bl.bm.Unpin(buff) + } + // reset map + bl.buffers = make(map[kfile.BlockId]*buffer.Buffer) } diff --git a/transaction/transactionMgr.go b/transaction/transactionMgr.go index 54bfe0d..33ef0be 100644 --- a/transaction/transactionMgr.go +++ b/transaction/transactionMgr.go @@ -47,11 +47,11 @@ func (t *TransactionMgr) Recover() { t.recoveryMgr.recover() } -func (t *TransactionMgr) Pin(blk *kfile.BlockId) { - bufferList.Pin(blk) +func (t *TransactionMgr) Pin(blk kfile.BlockId) { + t.bufferList.Pin(blk) } -func (t *TransactionMgr) UnPin(blk *kfile.BlockId) { - bufferList.UnPin(blk) +func (t *TransactionMgr) UnPin(blk kfile.BlockId) { + t.bufferList.Unpin(blk) } func (t *TransactionMgr) Size(filename string) int { @@ -86,13 +86,17 @@ func (t *TransactionMgr) nextTxNumber() int64 { func (t *TransactionMgr) FindCell(blk kfile.BlockId, key []byte) *kfile.Cell { t.cm.sLock(blk) - buff := t.bufferList.getBuffer(blk) - return buff.Contents().FindCell(key) + buff := t.bufferList.Buffer(blk) + cell, _, err := buff.Contents().FindCell(key) + if err != nil { + return nil + } + return cell } func (t *TransactionMgr) InsertCell(blk kfile.BlockId, key []byte, val any, okToLog bool) { t.cm.xLock(blk) - buff := t.bufferList.getBuffer(blk) + buff := t.bufferList.Buffer(blk) lsn := -1 if okToLog { lsn = t.rm.setValue(buff, key, val) @@ -101,5 +105,5 @@ func (t *TransactionMgr) InsertCell(blk kfile.BlockId, key []byte, val any, okTo cell := kfile.NewKVCell(cellKey) p := buff.Contents() p.InsertCell(cell) - buff.setModified(txnum, lsn) + buff.MarkModified(t.txtnum, lsn) } From c46436bf401001afb3de492e352e562cb40cb591 Mon Sep 17 00:00:00 2001 From: anthony4m Date: Sat, 8 Feb 2025 14:15:35 +0000 Subject: [PATCH 04/16] feat: Enhance transaction management with error handling and new log record types --- concurrency/concurrencyMgr.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 concurrency/concurrencyMgr.go diff --git a/concurrency/concurrencyMgr.go b/concurrency/concurrencyMgr.go new file mode 100644 index 0000000..374b1ac --- /dev/null +++ b/concurrency/concurrencyMgr.go @@ -0,0 +1,20 @@ +package transaction + +import "ultraSQL/kfile" + +type ConcurrencyMgr struct { + lTble *LockTable + locks map[kfile.BlockId]string +} + +func NewConcurrencyMgr() *ConcurrencyMgr { + return &ConcurrencyMgr{ + locks: make(map[kfile.BlockId]string), + } +} + +func (cM *ConcurrencyMgr) sLock(blk kfile.BlockId) { + if _, exists := cM.locks[blk]; !exists { + cM.lTble.sLock + } +} From b179f67d7af1de6d076dc96798414afbdf2939e3 Mon Sep 17 00:00:00 2001 From: anthony4m Date: Sat, 8 Feb 2025 14:16:15 +0000 Subject: [PATCH 05/16] feat: Add locking mechanism and log record interface for transaction management --- log/IRecord.go | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 log/IRecord.go diff --git a/log/IRecord.go b/log/IRecord.go new file mode 100644 index 0000000..876219f --- /dev/null +++ b/log/IRecord.go @@ -0,0 +1,7 @@ +package log + +type LogRecord interface { + Op() int + TxNumber() int + Undo(txnum int) error +} From b1de1583b0e8008b162e4ca21fa3563104310dad Mon Sep 17 00:00:00 2001 From: anthony4m Date: Sat, 8 Feb 2025 14:17:27 +0000 Subject: [PATCH 06/16] #5 feat: Implement locking mechanism and log record creation for transaction management --- concurrency/lockTable.go | 75 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 concurrency/lockTable.go diff --git a/concurrency/lockTable.go b/concurrency/lockTable.go new file mode 100644 index 0000000..387816a --- /dev/null +++ b/concurrency/lockTable.go @@ -0,0 +1,75 @@ +package transaction + +import ( + "fmt" + "sync" + "time" + "ultraSQL/kfile" +) + +const MaxTime = 10000 + +type LockTable struct { + locks map[kfile.BlockId]int + mu sync.RWMutex + cond *sync.Cond +} + +func NewLockTable() *LockTable { + lt := &LockTable{ + locks: make(map[kfile.BlockId]int), + } + lt.cond = sync.NewCond(<.mu) + return lt +} + +func (lT *LockTable) sLock(blk kfile.BlockId) error { + lT.mu.Lock() + defer lT.mu.Unlock() + + deadline := time.Now().Add(MaxTime) + + for lT.hasXLock(blk) { + if time.Now().After(deadline) { + return fmt.Errorf("lock acquisition timed out") + } + lT.cond.Wait() + } + + val := lT.getLockVal(blk) + lT.locks[blk] = val + 1 + return nil +} + +func (lT *LockTable) xLock(blk kfile.BlockId) error { + lT.mu.Lock() + defer lT.mu.Unlock() + + deadline := time.Now().Add(MaxTime) + + for lT.hasOtherSLocks(blk) { + if time.Now().After(deadline) { + return fmt.Errorf("lock acquisition timed out") + } + lT.cond.Wait() + } + + lT.locks[blk] = -1 + return nil +} + +func (lT *LockTable) hasXLock(blk kfile.BlockId) bool { + return lT.getLockVal(blk) < 0 +} + +func (lT *LockTable) getLockVal(blk kfile.BlockId) int { + val, exist := lT.locks[blk] + if exist { + return 0 + } + return val +} + +func (lT *LockTable) hasOtherSLocks(blk kfile.BlockId) bool { + return lT.getLockVal(blk) > 1 +} From 8d68462daeda865d3d9328cb7c5359caa2c00a59 Mon Sep 17 00:00:00 2001 From: anthony4m Date: Sat, 8 Feb 2025 14:17:41 +0000 Subject: [PATCH 07/16] #5 feat: Enhance transaction manager with error handling and new log record creation --- log/log_record.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 log/log_record.go diff --git a/log/log_record.go b/log/log_record.go new file mode 100644 index 0000000..97c951b --- /dev/null +++ b/log/log_record.go @@ -0,0 +1,42 @@ +package log + +import ( + "fmt" + "ultraSQL/kfile" +) + +const ( + CHECKPOINT = iota + START + COMMIT + ROLLBACK + SETINT + SETSTRING +) + +// CreateLogRecord is a package-level function that inspects the bytes +// to figure out which concrete record to instantiate. +func CreateLogRecord(data []byte) (LogRecord, error) { + // Suppose we have some helper "Page" type to interpret 'data'. + cell := kfile.NewKVCell(data) // you'll define or import this + cell.SetValue(data) + p := kfile.NewSlottedPage(0) // you'll define or import this + + recordType := p.InsertCell(cell) + switch recordType { + case CHECKPOINT: + return NewCheckpointRecord(), nil + case START: + return NewStartRecord(p), nil + case COMMIT: + return NewCommitRecord(p), nil + case ROLLBACK: + return NewRollbackRecord(p), nil + case SETINT: + return NewSetIntRecord(p), nil + case SETSTRING: + return NewSetStringRecord(p), nil + default: + return nil, fmt.Errorf("unknown log record type: %d", recordType) + } +} From 2cc3ebec1eeed7a6cdf3bc2333ebc377d4d85b38 Mon Sep 17 00:00:00 2001 From: anthony4m Date: Sat, 8 Feb 2025 14:17:55 +0000 Subject: [PATCH 08/16] #5 feat: Add error handling to Pin, UnPin, and InsertCell methods in transaction manager --- transaction/transactionMgr.go | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/transaction/transactionMgr.go b/transaction/transactionMgr.go index 33ef0be..b8fd11e 100644 --- a/transaction/transactionMgr.go +++ b/transaction/transactionMgr.go @@ -1,6 +1,7 @@ package transaction import ( + "fmt" "sync/atomic" "ultraSQL/buffer" "ultraSQL/kfile" @@ -47,11 +48,18 @@ func (t *TransactionMgr) Recover() { t.recoveryMgr.recover() } -func (t *TransactionMgr) Pin(blk kfile.BlockId) { - t.bufferList.Pin(blk) +func (t *TransactionMgr) Pin(blk kfile.BlockId) error { + err := t.bufferList.Pin(blk) + if err != nil { + return fmt.Errorf("failed to pin block %v: %w", blk, err) + } + return nil } -func (t *TransactionMgr) UnPin(blk kfile.BlockId) { - t.bufferList.Unpin(blk) +func (t *TransactionMgr) UnPin(blk kfile.BlockId) error { + err := t.bufferList.Unpin(blk) + if err != nil { + return fmt.Errorf("failed to pin block %v: %w", blk, err) + } } func (t *TransactionMgr) Size(filename string) int { @@ -94,16 +102,20 @@ func (t *TransactionMgr) FindCell(blk kfile.BlockId, key []byte) *kfile.Cell { return cell } -func (t *TransactionMgr) InsertCell(blk kfile.BlockId, key []byte, val any, okToLog bool) { +func (t *TransactionMgr) InsertCell(blk kfile.BlockId, key []byte, val any, okToLog bool) error { t.cm.xLock(blk) buff := t.bufferList.Buffer(blk) lsn := -1 if okToLog { lsn = t.rm.setValue(buff, key, val) } - cellKey := t.txtnum + cellKey := key cell := kfile.NewKVCell(cellKey) p := buff.Contents() - p.InsertCell(cell) + err := p.InsertCell(cell) + if err != nil { + return fmt.Errorf("failed to pin block %v: %w", blk, err) + } buff.MarkModified(t.txtnum, lsn) + return nil } From 93dfb9987153ade2240e2234ba662e0f75c9909c Mon Sep 17 00:00:00 2001 From: anthony4m Date: Sun, 9 Feb 2025 20:46:47 +0000 Subject: [PATCH 09/16] Refactor: Rename GetFileName to FileName Renamed `GetFileName` method to `FileName` for better readability and consistency. Updated all references to the renamed method. (transaction) --- kfile/file.go | 2 +- kfile/fileMgr.go | 18 +++++++++--------- kfile/file__dir_test.go | 18 +++++++++--------- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/kfile/file.go b/kfile/file.go index fc1171b..1619f21 100644 --- a/kfile/file.go +++ b/kfile/file.go @@ -23,7 +23,7 @@ func NewBlockId(filename string, blknum int) *BlockId { } } -func (b *BlockId) GetFileName() string { +func (b *BlockId) FileName() string { return b.Filename } diff --git a/kfile/fileMgr.go b/kfile/fileMgr.go index d8d65a6..5c52dc0 100644 --- a/kfile/fileMgr.go +++ b/kfile/fileMgr.go @@ -112,7 +112,7 @@ func (fm *FileMgr) PreallocateFile(blk *BlockId, size int64) error { return err } - filename := blk.GetFileName() + filename := blk.FileName() if err := fm.validatePermissions(); err != nil { return err } @@ -125,7 +125,7 @@ func (fm *FileMgr) validatePreallocationParams(blk *BlockId, size int64) error { if size%int64(fm.blocksize) != 0 { return fmt.Errorf("size must be a multiple of blocksize %d", fm.blocksize) } - if blk.GetFileName() == "" { + if blk.FileName() == "" { return fmt.Errorf("invalid filename") } return nil @@ -193,14 +193,14 @@ func (fm *FileMgr) Read(blk *BlockId, p *SlottedPage) error { fm.mutex.RLock() defer fm.mutex.RUnlock() - f, err := fm.getFile(blk.GetFileName()) + f, err := fm.getFile(blk.FileName()) if err != nil { return fmt.Errorf("failed to get file for block %v: %w", blk, err) } offset := int64(blk.Number() * fm.blocksize) if _, err = f.Seek(offset, io.SeekStart); err != nil { - return fmt.Errorf(seekErrFormat, offset, blk.GetFileName(), err) + return fmt.Errorf(seekErrFormat, offset, blk.FileName(), err) } bytesRead, err := f.Read(p.Contents()) if err != nil { @@ -224,14 +224,14 @@ func (fm *FileMgr) Write(blk *BlockId, p *SlottedPage) error { fm.mutex.Lock() defer fm.mutex.Unlock() - f, err := fm.getFile(blk.GetFileName()) + f, err := fm.getFile(blk.FileName()) if err != nil { return fmt.Errorf("failed to get file for block %v: %w", blk, err) } offset := int64(blk.Number() * fm.blocksize) if _, err = f.Seek(offset, io.SeekStart); err != nil { - return fmt.Errorf(seekErrFormat, offset, blk.GetFileName(), err) + return fmt.Errorf(seekErrFormat, offset, blk.FileName(), err) } bytesWritten, err := f.Write(p.Contents()) if err != nil { @@ -241,7 +241,7 @@ func (fm *FileMgr) Write(blk *BlockId, p *SlottedPage) error { return fmt.Errorf("incomplete write: expected %d bytes, wrote %d", fm.blocksize, bytesWritten) } if err = f.Sync(); err != nil { - return fmt.Errorf("failed to sync file %s: %w", blk.GetFileName(), err) + return fmt.Errorf("failed to sync file %s: %w", blk.FileName(), err) } fm.blocksWritten++ @@ -379,7 +379,7 @@ func (fm *FileMgr) WriteLog() []ReadWriteLogEntry { // ensureFileSize ensures the file has at least the required number of blocks. func (fm *FileMgr) ensureFileSize(blk *BlockId, requiredBlocks int) error { - currentBlocks, err := fm.Length(blk.GetFileName()) + currentBlocks, err := fm.Length(blk.FileName()) if err != nil { return err } @@ -399,7 +399,7 @@ func (fm *FileMgr) RenameFile(blk *BlockId, newFileName string) error { return fmt.Errorf("invalid new filename: %s", newFileName) } - oldFileName := blk.GetFileName() + oldFileName := blk.FileName() // Close the old file if it is open. fm.openFilesLock.Lock() diff --git a/kfile/file__dir_test.go b/kfile/file__dir_test.go index 183f93a..03121c5 100644 --- a/kfile/file__dir_test.go +++ b/kfile/file__dir_test.go @@ -46,8 +46,8 @@ func TestBlock(t *testing.T) { Blknum := 5 blk := NewBlockId(Filename, Blknum) - if blk.GetFileName() != Filename { - t.Errorf("Expected Filename %s, got %s", Filename, blk.GetFileName()) + if blk.FileName() != Filename { + t.Errorf("Expected Filename %s, got %s", Filename, blk.FileName()) } if blk.Number() != Blknum { @@ -126,13 +126,13 @@ func TestBlock(t *testing.T) { // Test NextBlock next := blk.NextBlock() - if next.Number() != 6 || next.GetFileName() != "test.db" { + if next.Number() != 6 || next.FileName() != "test.db" { t.Error("NextBlock returned incorrect block") } // Test PrevBlock prev := blk.PrevBlock() - if prev.Number() != 4 || prev.GetFileName() != "test.db" { + if prev.Number() != 4 || prev.FileName() != "test.db" { t.Error("PrevBlock returned incorrect block") } @@ -372,8 +372,8 @@ func TestBlockId(t *testing.T) { blknum := 5 blk := NewBlockId(filename, blknum) - if blk.GetFileName() != filename { - t.Errorf("Expected Filename %s, got %s", filename, blk.GetFileName()) + if blk.FileName() != filename { + t.Errorf("Expected Filename %s, got %s", filename, blk.FileName()) } if blk.Number() != blknum { @@ -448,12 +448,12 @@ func TestBlockId(t *testing.T) { blk := NewBlockId("test.db", 5) next := blk.NextBlock() - if next.Number() != 6 || next.GetFileName() != "test.db" { + if next.Number() != 6 || next.FileName() != "test.db" { t.Error("NextBlock returned incorrect block") } prev := blk.PrevBlock() - if prev.Number() != 4 || prev.GetFileName() != "test.db" { + if prev.Number() != 4 || prev.FileName() != "test.db" { t.Error("PrevBlock returned incorrect block") } @@ -779,7 +779,7 @@ func TestFileRename(t *testing.T) { t.Errorf("Could not rename file %s", err) } want := new_file - got := blk.GetFileName() + got := blk.FileName() if want != got { t.Errorf("want %s but got %s", want, got) } From ccf8bb503bb51475f7a602e30a49f5d60c69c28a Mon Sep 17 00:00:00 2001 From: anthony4m Date: Sun, 9 Feb 2025 20:47:18 +0000 Subject: [PATCH 10/16] feat(transaction): Add Buffer() method to LogMgr Add a method to access the internal buffer of the LogMgr struct. This allows external access to the buffer for inspection or manipulation. --- log/logmgr.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/log/logmgr.go b/log/logmgr.go index b66d3b2..d4c3d81 100644 --- a/log/logmgr.go +++ b/log/logmgr.go @@ -212,3 +212,7 @@ func (lm *LogMgr) ValidateKey(key []byte) bool { generatedKey := lm.GenerateKey() return bytes.Compare(key, generatedKey) == 0 } + +func (lm *LogMgr) Buffer() *buffer.Buffer { + return lm.logBuffer +} From c94d5ddd764608a766ed19733f145dc9b266a9a7 Mon Sep 17 00:00:00 2001 From: anthony4m Date: Sun, 9 Feb 2025 20:47:44 +0000 Subject: [PATCH 11/16] feat: Implement concurrency manager (transaction) Refactor concurrency management to use a more robust and thread-safe implementation. Move the `ConcurrencyMgr` to the `concurrency` package. Add shared and exclusive locking mechanisms with error handling and a release method. Implement two-phase locking protocol. --- concurrency/concurrencyMgr.go | 89 +++++++++++++++++++++++++++++++++-- transaction/concurrencyMgr.go | 8 ---- 2 files changed, 85 insertions(+), 12 deletions(-) delete mode 100644 transaction/concurrencyMgr.go diff --git a/concurrency/concurrencyMgr.go b/concurrency/concurrencyMgr.go index 374b1ac..616bf69 100644 --- a/concurrency/concurrencyMgr.go +++ b/concurrency/concurrencyMgr.go @@ -1,10 +1,15 @@ -package transaction +package concurrency -import "ultraSQL/kfile" +import ( + "fmt" + "sync" + "ultraSQL/kfile" +) type ConcurrencyMgr struct { lTble *LockTable locks map[kfile.BlockId]string + mu sync.RWMutex // Protect shared map access } func NewConcurrencyMgr() *ConcurrencyMgr { @@ -13,8 +18,84 @@ func NewConcurrencyMgr() *ConcurrencyMgr { } } -func (cM *ConcurrencyMgr) sLock(blk kfile.BlockId) { +func (cM *ConcurrencyMgr) SLock(blk kfile.BlockId) error { + cM.mu.Lock() + defer cM.mu.Unlock() + + // If we already have any lock (S or X), no need to acquire again + if _, exists := cM.locks[blk]; exists { + return nil + } + + err := cM.lTble.sLock(blk) + if err != nil { + return fmt.Errorf("failed to acquire shared lock: %w", err) + } + + cM.locks[blk] = "S" + return nil +} + +func (cM *ConcurrencyMgr) XLock(blk kfile.BlockId) error { + cM.mu.Lock() + defer cM.mu.Unlock() + + // If we already have an X lock, no need to acquire again + if cM.hasXLock(blk) { + return nil + } + + // Following the two-phase locking protocol: + // 1. First acquire S lock if we don't have any lock if _, exists := cM.locks[blk]; !exists { - cM.lTble.sLock + err := cM.lTble.sLock(blk) + if err != nil { + return fmt.Errorf("failed to acquire initial shared lock: %w", err) + } + cM.locks[blk] = "S" } + + // 2. Then upgrade to X lock + err := cM.lTble.xLock(blk) + if err != nil { + return fmt.Errorf("failed to upgrade to exclusive lock: %w", err) + } + + cM.locks[blk] = "X" + return nil +} + +func (cM *ConcurrencyMgr) Release() error { + cM.mu.Lock() + defer cM.mu.Unlock() + + var errs []error + for blk := range cM.locks { + if err := cM.lTble.unlock(blk); err != nil { + errs = append(errs, fmt.Errorf("failed to release lock for block %v: %w", blk, err)) + } + } + + // Clear the locks map regardless of errors + cM.locks = make(map[kfile.BlockId]string) + + if len(errs) > 0 { + return fmt.Errorf("errors during release: %v", errs) + } + return nil +} + +func (cM *ConcurrencyMgr) hasXLock(blk kfile.BlockId) bool { + // Note: Caller must hold mutex + lockType, ok := cM.locks[blk] + return ok && lockType == "X" +} + +// Helper method to check current lock status +func (cM *ConcurrencyMgr) GetLockType(blk kfile.BlockId) (string, bool) { + cM.mu.RLock() + defer cM.mu.RUnlock() + + lockType, exists := cM.locks[blk] + return lockType, exists } diff --git a/transaction/concurrencyMgr.go b/transaction/concurrencyMgr.go deleted file mode 100644 index ed6b716..0000000 --- a/transaction/concurrencyMgr.go +++ /dev/null @@ -1,8 +0,0 @@ -package transaction - -type ConcurrencyMgr struct { -} - -func NewConcurrencyMgr() *ConcurrencyMgr { - return &ConcurrencyMgr{} -} From 28b0039f9f8aadc69acb777e2a58fe9c6133aa7b Mon Sep 17 00:00:00 2001 From: anthony4m Date: Sun, 9 Feb 2025 20:48:50 +0000 Subject: [PATCH 12/16] #5 feat: Improve lock table concurrency and error handling (1) Refactor lock table to use time.Second for timeout and improve error messages. Add unlock functionality and GetLockInfo helper method. Change package name from transaction to concurrency. --- concurrency/lockTable.go | 63 ++++++++++++++++++++++++++++++++-------- 1 file changed, 51 insertions(+), 12 deletions(-) diff --git a/concurrency/lockTable.go b/concurrency/lockTable.go index 387816a..a345d35 100644 --- a/concurrency/lockTable.go +++ b/concurrency/lockTable.go @@ -1,4 +1,4 @@ -package transaction +package concurrency import ( "fmt" @@ -7,10 +7,10 @@ import ( "ultraSQL/kfile" ) -const MaxTime = 10000 +const MaxWaitTime = 10 * time.Second type LockTable struct { - locks map[kfile.BlockId]int + locks map[kfile.BlockId]int // positive: number of shared locks, negative: exclusive lock mu sync.RWMutex cond *sync.Cond } @@ -27,15 +27,17 @@ func (lT *LockTable) sLock(blk kfile.BlockId) error { lT.mu.Lock() defer lT.mu.Unlock() - deadline := time.Now().Add(MaxTime) + deadline := time.Now().Add(MaxWaitTime) + // Wait while there's an exclusive lock on the block for lT.hasXLock(blk) { if time.Now().After(deadline) { - return fmt.Errorf("lock acquisition timed out") + return fmt.Errorf("shared lock acquisition timed out for block %v", blk) } lT.cond.Wait() } + // Increment the number of shared locks (or initialize to 1) val := lT.getLockVal(blk) lT.locks[blk] = val + 1 return nil @@ -45,15 +47,17 @@ func (lT *LockTable) xLock(blk kfile.BlockId) error { lT.mu.Lock() defer lT.mu.Unlock() - deadline := time.Now().Add(MaxTime) + deadline := time.Now().Add(MaxWaitTime) - for lT.hasOtherSLocks(blk) { + // Wait while there are other locks (shared or exclusive) + for lT.hasOtherLocks(blk) { if time.Now().After(deadline) { - return fmt.Errorf("lock acquisition timed out") + return fmt.Errorf("exclusive lock acquisition timed out for block %v", blk) } lT.cond.Wait() } + // Set to -1 to indicate exclusive lock lT.locks[blk] = -1 return nil } @@ -63,13 +67,48 @@ func (lT *LockTable) hasXLock(blk kfile.BlockId) bool { } func (lT *LockTable) getLockVal(blk kfile.BlockId) int { - val, exist := lT.locks[blk] - if exist { + val, exists := lT.locks[blk] + if !exists { return 0 } return val } -func (lT *LockTable) hasOtherSLocks(blk kfile.BlockId) bool { - return lT.getLockVal(blk) > 1 +func (lT *LockTable) hasOtherLocks(blk kfile.BlockId) bool { + val := lT.getLockVal(blk) + return val != 0 && val != 1 // Allow upgrade from single shared lock +} + +func (lT *LockTable) unlock(blk kfile.BlockId) error { + lT.mu.Lock() + defer lT.mu.Unlock() + + val := lT.getLockVal(blk) + if val == 0 { + return fmt.Errorf("attempting to unlock block %v which is not locked", blk) + } + + if val > 1 { + // Decrement shared lock count + lT.locks[blk] = val - 1 + } else { + // Remove last shared lock or exclusive lock + delete(lT.locks, blk) + lT.cond.Broadcast() // Wake up waiting goroutines + } + return nil +} + +// Helper method to get lock information +func (lT *LockTable) GetLockInfo(blk kfile.BlockId) (lockType string, count int) { + lT.mu.RLock() + defer lT.mu.RUnlock() + + val := lT.getLockVal(blk) + if val < 0 { + return "exclusive", 1 + } else if val > 0 { + return "shared", val + } + return "none", 0 } From d75d136da85dcaad6c58ab974a4bbb83f8be2eec Mon Sep 17 00:00:00 2001 From: anthony4m Date: Sun, 9 Feb 2025 20:49:22 +0000 Subject: [PATCH 13/16] feat: Implement unified update log record (main) This commit implements a new log record type, `UnifiedUpdateRecord`, for handling unified updates in the logging system. It includes serialization and deserialization methods, as well as an `Undo` method for transaction rollback. The `CreateLogRecord` function is updated to handle this new record type. The old `log_record.go` file has been removed and replaced with a new, improved version. --- log/log_record.go | 42 --------- log_record/log_record.go | 180 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 180 insertions(+), 42 deletions(-) delete mode 100644 log/log_record.go create mode 100644 log_record/log_record.go diff --git a/log/log_record.go b/log/log_record.go deleted file mode 100644 index 97c951b..0000000 --- a/log/log_record.go +++ /dev/null @@ -1,42 +0,0 @@ -package log - -import ( - "fmt" - "ultraSQL/kfile" -) - -const ( - CHECKPOINT = iota - START - COMMIT - ROLLBACK - SETINT - SETSTRING -) - -// CreateLogRecord is a package-level function that inspects the bytes -// to figure out which concrete record to instantiate. -func CreateLogRecord(data []byte) (LogRecord, error) { - // Suppose we have some helper "Page" type to interpret 'data'. - cell := kfile.NewKVCell(data) // you'll define or import this - cell.SetValue(data) - p := kfile.NewSlottedPage(0) // you'll define or import this - - recordType := p.InsertCell(cell) - switch recordType { - case CHECKPOINT: - return NewCheckpointRecord(), nil - case START: - return NewStartRecord(p), nil - case COMMIT: - return NewCommitRecord(p), nil - case ROLLBACK: - return NewRollbackRecord(p), nil - case SETINT: - return NewSetIntRecord(p), nil - case SETSTRING: - return NewSetStringRecord(p), nil - default: - return nil, fmt.Errorf("unknown log record type: %d", recordType) - } -} diff --git a/log_record/log_record.go b/log_record/log_record.go new file mode 100644 index 0000000..ed676b6 --- /dev/null +++ b/log_record/log_record.go @@ -0,0 +1,180 @@ +package log_record + +import ( + "bytes" + "encoding/binary" + "fmt" + "ultraSQL/kfile" + _ "ultraSQL/kfile" + "ultraSQL/log" + "ultraSQL/transaction" +) + +// Example op code if you're not using separate ones. +const UNIFIEDUPDATE = 100 + +type UnifiedUpdateRecord struct { + txNum int64 + blkFile string + blkNum int + key []byte + oldBytes []byte + newBytes []byte +} + +func WriteUnifiedUpdateLogRecord( + lm *log.LogMgr, + txNum int64, + blk *kfile.BlockId, + key []byte, + oldBytes []byte, + newBytes []byte, +) int { + // Implementation up to you; typically you’d: + // 1) Create a record structure (e.g. UnifiedUpdateRecord). + // 2) Serialize txNum, block info, slotIndex, oldBytes, newBytes. + // 3) Append to log manager, returning the LSN. + return 0 // placeholder +} + +// Ensure it satisfies the LogRecord transaction_interface +func (rec *UnifiedUpdateRecord) Op() int { + return UNIFIEDUPDATE +} +func (rec *UnifiedUpdateRecord) TxNumber() int64 { + return rec.txNum +} + +// Undo reverts the page/slot to oldBytes +func (rec *UnifiedUpdateRecord) Undo(tx *transaction.TransactionMgr) { + // 1) Pin or fetch the buffer for rec.blkFile, rec.blkNum + // 2) Cast to SlottedPage + // 3) Overwrite the cell at rec.slotIndex with oldBytes + fmt.Printf("Undoing unified update: restoring old cell bytes for tx=%d slot=%d\n", rec.txNum, rec.slotIndex) + // ... actual code ... +} + +// Serialize the record to bytes +func (rec *UnifiedUpdateRecord) ToBytes() []byte { + buf := new(bytes.Buffer) + // 1. Op code + _ = binary.Write(buf, binary.BigEndian, int32(UNIFIEDUPDATE)) + // 2. txNum + _ = binary.Write(buf, binary.BigEndian, rec.txNum) + + // 3. block info + writeString(buf, rec.blkFile) + _ = binary.Write(buf, binary.BigEndian, int32(rec.blkNum)) + _ = binary.Write(buf, binary.BigEndian, int32(rec.slotIndex)) + + // 4. oldBytes + writeBytes(buf, rec.oldBytes) + // 5. newBytes + writeBytes(buf, rec.newBytes) + return buf.Bytes() +} + +// parse UnifiedUpdateRecord from bytes +func FromBytesUnifiedUpdate(data []byte) (*UnifiedUpdateRecord, error) { + buf := bytes.NewReader(data) + + var op int32 + if err := binary.Read(buf, binary.BigEndian, &op); err != nil { + return nil, err + } + if op != UNIFIEDUPDATE { + return nil, fmt.Errorf("not a unified update record") + } + + var txNum int64 + if err := binary.Read(buf, binary.BigEndian, &txNum); err != nil { + return nil, err + } + + blkFile, err := readString(buf) + if err != nil { + return nil, err + } + + var blkNum int32 + if err := binary.Read(buf, binary.BigEndian, &blkNum); err != nil { + return nil, err + } + var slotIndex int32 + if err := binary.Read(buf, binary.BigEndian, &slotIndex); err != nil { + return nil, err + } + + oldBytes, err := readBytes(buf) + if err != nil { + return nil, err + } + newBytes, err := readBytes(buf) + if err != nil { + return nil, err + } + + return &UnifiedUpdateRecord{ + txNum: txNum, + blkFile: blkFile, + blkNum: int(blkNum), + slotIndex: int(slotIndex), + oldBytes: oldBytes, + newBytes: newBytes, + }, nil +} + +// Helpers for writing/reading strings/bytes: + +func writeString(buf *bytes.Buffer, s string) { + writeBytes(buf, []byte(s)) +} +func readString(buf *bytes.Reader) (string, error) { + b, err := readBytes(buf) + if err != nil { + return "", err + } + return string(b), nil +} +func writeBytes(buf *bytes.Buffer, data []byte) { + _ = binary.Write(buf, binary.BigEndian, int32(len(data))) + buf.Write(data) +} +func readBytes(buf *bytes.Reader) ([]byte, error) { + var length int32 + if err := binary.Read(buf, binary.BigEndian, &length); err != nil { + return nil, err + } + if length < 0 { + return nil, fmt.Errorf("negative length") + } + b := make([]byte, length) + n, err := buf.Read(b) + if err != nil || n != int(length) { + return nil, fmt.Errorf("failed to read bytes") + } + return b, nil +} + +func CreateLogRecord(data []byte) log.LogRecord { + // Peek at op code + if len(data) < 4 { + return nil + } + op := int32(binary.BigEndian.Uint32(data[0:4])) + switch op { + case log.CHECKPOINT: + return NewCheckpointRecordFromBytes(data) + case log.START: + return NewStartRecordFromBytes(data) + case log.COMMIT: + return NewCommitRecordFromBytes(data) + case log.ROLLBACK: + return NewRollbackRecordFromBytes(data) + case UNIFIEDUPDATE: + rec, _ := FromBytesUnifiedUpdate(data) + return rec + default: + return nil + } +} From b39bf4398569ba1692224781840fcb7e2694b09e Mon Sep 17 00:00:00 2001 From: anthony4m Date: Sun, 9 Feb 2025 20:49:40 +0000 Subject: [PATCH 14/16] Fix: Use correct method to get block filename (#123 (transaction)) Corrected the method used to retrieve the block filename in LogIterator. The `GetFileName` method was replaced with `FileName` for accuracy. --- utils/LogIterator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/LogIterator.go b/utils/LogIterator.go index cded21e..418ab9e 100644 --- a/utils/LogIterator.go +++ b/utils/LogIterator.go @@ -43,7 +43,7 @@ func (it *LogIterator) Next() ([]byte, error) { // strictly speaking, we have no next record return nil, fmt.Errorf("no more records in block 0") } - newBlk := kfile.NewBlockId(it.blk.GetFileName(), it.blk.Number()-1) + newBlk := kfile.NewBlockId(it.blk.FileName(), it.blk.Number()-1) if err := it.moveToBlock(newBlk); err != nil { return nil, err } From e62a477741a9760a196409ce50561f8ec79909ad Mon Sep 17 00:00:00 2001 From: anthony4m Date: Sun, 9 Feb 2025 20:50:20 +0000 Subject: [PATCH 15/16] #5 feat: Implement recovery manager (transaction) This commit implements a recovery manager that handles logging, commit, rollback, and recovery operations for transactions. The `recoveryMgr.go` file in the `recovery` package now contains the implementation. The previous `recoveryMgr.go` file in the `transaction` package has been removed. The recovery manager uses unified log records to efficiently handle undo and redo operations. --- recovery/recoveryMgr.go | 159 +++++++++++++++++++++++++++++++++++++ transaction/recoveryMgr.go | 17 ---- 2 files changed, 159 insertions(+), 17 deletions(-) create mode 100644 recovery/recoveryMgr.go delete mode 100644 transaction/recoveryMgr.go diff --git a/recovery/recoveryMgr.go b/recovery/recoveryMgr.go new file mode 100644 index 0000000..58b91ec --- /dev/null +++ b/recovery/recoveryMgr.go @@ -0,0 +1,159 @@ +package recovery + +import ( + "fmt" + "ultraSQL/buffer" + "ultraSQL/log" + "ultraSQL/log_record" + "ultraSQL/transaction" +) + +// RecoveryMgr manages the logging and recovery for a given transaction. +type RecoveryMgr struct { + lm *log.LogMgr + bm *buffer.BufferMgr + tx *transaction.TransactionMgr + txNum int64 +} + +// NewRecoveryMgr is analogous to the Java constructor: +// +// public RecoveryMgr(Transaction tx, int txnum, LogMgr lm, BufferMgr bm) +func NewRecoveryMgr(tx *transaction.TransactionMgr, txNum int64, lm *log.LogMgr, bm *buffer.BufferMgr) *RecoveryMgr { + rm := &RecoveryMgr{ + tx: tx, + txNum: txNum, + lm: lm, + bm: bm, + } + // Write a START record to the log + StartRecordWriteToLog(lm, txNum) // e.g., StartRecord.writeToLog(lm, txNum) + return rm +} + +// Commit is analogous to public void commit() +func (r *RecoveryMgr) Commit() { + // 1. Flush all buffers associated with this transaction + r.bm.Policy().FlushAll(r.txNum) + + // 2. Write COMMIT record to the log + lsn := CommitRecordWriteToLog(r.lm, r.txNum) // e.g., CommitRecord.writeToLog(r.lm, r.txNum) + + // 3. Force the log up to that LSN + flushErr := r.lm.Buffer().FlushLSN(lsn) + if flushErr != nil { + fmt.Printf("error occurred during commit flush: %v\n", flushErr) + } +} + +// Rollback is analogous to public void rollback() +func (r *RecoveryMgr) Rollback() { + r.doRollback() + r.bm.Policy().FlushAll(r.txNum) + lsn := RollbackRecordWriteToLog(r.lm, r.txNum) + flushErr := r.lm.Buffer().FlushLSN(lsn) + if flushErr != nil { + fmt.Printf("error occurred during rollback flush: %v\n", flushErr) + } +} + +// Recover is analogous to public void recover() +func (r *RecoveryMgr) Recover() { + r.doRecover() + r.bm.Policy().FlushAll(r.txNum) + lsn := CheckpointRecordWriteToLog(r.lm) // e.g., CheckpointRecord.writeToLog(lm) + flushErr := r.lm.Buffer().FlushLSN(lsn) + if flushErr != nil { + fmt.Printf("error occurred during recovery flush: %v\n", flushErr) + } +} + +// SetCellValue updates the cell in a slotted page, then writes a unified log record +// that stores the old/new serialized cell bytes for undo/redo. +func (r *RecoveryMgr) SetCellValue(buff *buffer.Buffer, key []byte, newVal any) (int, error) { + // 1. Get the slotted page from the buffer. + sp := buff.Contents() + + // 2. Retrieve the cell at the given slot. + cell, _, err := sp.FindCell(key) + if err != nil { + return -1, fmt.Errorf("failed to get cell at slot %d: %w", key, err) + } + + // 3. Serialize the current (old) cell state. + oldBytes := cell.ToBytes() + + // 4. Update the cell with the new value (the cell handles type encoding). + if err := cell.SetValue(newVal); err != nil { + return -1, fmt.Errorf("failed to set cell value: %w", err) + } + + // 5. Serialize the new cell state. + newBytes := cell.ToBytes() + + // 6. Write a unified update record to the log: includes txNum, block ID, slotIndex, oldBytes, newBytes. + blk := buff.Block() // or any *BlockId if your Buffer returns it + lsn := log_record.WriteUnifiedUpdateLogRecord(r.lm, r.txNum, blk, key, oldBytes, newBytes) + + // 7. Return the LSN so the caller can handle further flush or keep track of it. + return lsn, nil +} + +// doRollback performs a backward scan of the log to undo any record belonging to this transaction. +func (r *RecoveryMgr) doRollback() { + iter, err := r.lm.Iterator() + if err != nil { + fmt.Printf("error occurred creating log iterator: %v\n", err) + return + } + for iter.HasNext() { + data, err := iter.Next() + if err != nil { + fmt.Printf("error occurred reading next log record: %v\n", err) + return + } + rec := CreateLogRecord(data) // e.g. UnifiedUpdateRecord or other record + if rec == nil { + continue + } + if rec.TxNumber() == r.txNum { + if rec.Op() == START { + // Once we reach the START record for our transaction, we stop + return + } + rec.Undo(r.tx) // "Undo" is record-specific logic + } + } +} + +// doRecover replays the log from the end, undoing updates for transactions that never committed. +func (r *RecoveryMgr) doRecover() { + finishedTxs := make(map[int64]bool) + + iter, err := r.lm.Iterator() + if err != nil { + fmt.Printf("error occurred creating log iterator: %v\n", err) + return + } + for iter.HasNext() { + data, err := iter.Next() + if err != nil { + fmt.Printf("error occurred reading next log record: %v\n", err) + return + } + rec := CreateLogRecord(data) + if rec == nil { + continue + } + switch rec.Op() { + case CHECKPOINT: + return + case COMMIT, ROLLBACK: + finishedTxs[rec.TxNumber()] = true + default: + if !finishedTxs[rec.TxNumber()] { + rec.Undo(r.tx) + } + } + } +} diff --git a/transaction/recoveryMgr.go b/transaction/recoveryMgr.go deleted file mode 100644 index 9f92640..0000000 --- a/transaction/recoveryMgr.go +++ /dev/null @@ -1,17 +0,0 @@ -package transaction - -import ( - "ultraSQL/buffer" - "ultraSQL/log" -) - -type RecoveryMgr struct { -} - -func NewRecoveryMgr(txMgr *TransactionMgr, txtnum int64, lm *log.LogMgr, bm *buffer.BufferMgr) *RecoveryMgr { - return &RecoveryMgr{} -} - -func (r *RecoveryMgr) Commit() { - -} From 7b2280568ca9561912e6f66b42acdf2a533ae3c9 Mon Sep 17 00:00:00 2001 From: anthony4m Date: Sun, 9 Feb 2025 20:50:54 +0000 Subject: [PATCH 16/16] feat: Refactor transaction management and logging (#5) This commit refactors the transaction management and logging system. The `LogRecord` interface is enhanced to include new operation types. The `TransactionMgr` is updated to use the new `recovery` and `concurrency` packages, improving concurrency control and recovery mechanisms. The `ITransaction` interface is moved to a new `transaction_interface` package. Error handling is improved throughout the codebase. --- log/IRecord.go | 12 +++++- transaction/transactionMgr.go | 40 ++++++++++++------- .../ITransaction.go | 2 +- 3 files changed, 37 insertions(+), 17 deletions(-) rename {transaction => transaction_interface}/ITransaction.go (71%) diff --git a/log/IRecord.go b/log/IRecord.go index 876219f..83b57ae 100644 --- a/log/IRecord.go +++ b/log/IRecord.go @@ -1,7 +1,17 @@ package log +const ( + CHECKPOINT = iota + START + COMMIT + ROLLBACK + SETINT + SETSTRING +) + type LogRecord interface { Op() int TxNumber() int - Undo(txnum int) error + Undo(txNum int) + // Optionally: a method to serialize or convert to a Cell } diff --git a/transaction/transactionMgr.go b/transaction/transactionMgr.go index b8fd11e..dac17be 100644 --- a/transaction/transactionMgr.go +++ b/transaction/transactionMgr.go @@ -4,15 +4,17 @@ import ( "fmt" "sync/atomic" "ultraSQL/buffer" + "ultraSQL/concurrency" "ultraSQL/kfile" "ultraSQL/log" + "ultraSQL/recovery" ) type TransactionMgr struct { nextTxNum int64 EndOfFile int - rm *RecoveryMgr - cm *ConcurrencyMgr + rm *recovery.RecoveryMgr + cm *concurrency.ConcurrencyMgr bm *buffer.BufferMgr fm *kfile.FileMgr txtnum int64 @@ -25,8 +27,8 @@ func NewTransaction(fm *kfile.FileMgr, lm *log.LogMgr, bm *buffer.BufferMgr) *Tr bm: bm, } tx.nextTxNum = tx.nextTxNumber() - tx.rm = NewRecoveryMgr(tx, tx.txtnum, lm, bm) - tx.cm = NewConcurrencyMgr() + tx.rm = recovery.NewRecoveryMgr(tx, tx.txtnum, lm, bm) + tx.cm = concurrency.NewConcurrencyMgr() tx.bufferList = NewBufferList(bm) return tx } @@ -34,7 +36,7 @@ func NewTransaction(fm *kfile.FileMgr, lm *log.LogMgr, bm *buffer.BufferMgr) *Tr func (t *TransactionMgr) Commit() { t.rm.Commit() t.cm.Release() - t.bufferlist.UnpinAll() + t.bufferList.UnpinAll() } func (t *TransactionMgr) Rollback() { @@ -45,7 +47,7 @@ func (t *TransactionMgr) Rollback() { func (t *TransactionMgr) Recover() { t.bm.Policy().FlushAll(t.txtnum) - t.recoveryMgr.recover() + t.rm.Recover() } func (t *TransactionMgr) Pin(blk kfile.BlockId) error { @@ -60,21 +62,25 @@ func (t *TransactionMgr) UnPin(blk kfile.BlockId) error { if err != nil { return fmt.Errorf("failed to pin block %v: %w", blk, err) } + return nil } -func (t *TransactionMgr) Size(filename string) int { +func (t *TransactionMgr) Size(filename string) (int, error) { dummyblk := kfile.NewBlockId(filename, t.EndOfFile) - t.cm.sLock(dummyblk) + err := t.cm.SLock(*dummyblk) + if err != nil { + return 0, fmt.Errorf("an error occured when acquiring lock %s", err) + } fileLength, err := t.fm.LengthLocked(filename) if err != nil { - return 0 + return 0, fmt.Errorf("an error occured when acquiring file length %s", err) } - return fileLength + return fileLength, nil } func (t *TransactionMgr) append(filename string) *kfile.BlockId { dummyblk := kfile.NewBlockId(filename, t.EndOfFile) - t.cm.xLock(dummyblk) + t.cm.XLock(*dummyblk) blk, err := t.fm.Append(filename) if err != nil { return nil @@ -93,7 +99,7 @@ func (t *TransactionMgr) nextTxNumber() int64 { } func (t *TransactionMgr) FindCell(blk kfile.BlockId, key []byte) *kfile.Cell { - t.cm.sLock(blk) + t.cm.SLock(blk) buff := t.bufferList.Buffer(blk) cell, _, err := buff.Contents().FindCell(key) if err != nil { @@ -103,16 +109,20 @@ func (t *TransactionMgr) FindCell(blk kfile.BlockId, key []byte) *kfile.Cell { } func (t *TransactionMgr) InsertCell(blk kfile.BlockId, key []byte, val any, okToLog bool) error { - t.cm.xLock(blk) + t.cm.XLock(blk) buff := t.bufferList.Buffer(blk) lsn := -1 + var err error if okToLog { - lsn = t.rm.setValue(buff, key, val) + lsn, err = t.rm.SetCellValue(buff, key, val) + if err != nil { + return nil + } } cellKey := key cell := kfile.NewKVCell(cellKey) p := buff.Contents() - err := p.InsertCell(cell) + err = p.InsertCell(cell) if err != nil { return fmt.Errorf("failed to pin block %v: %w", blk, err) } diff --git a/transaction/ITransaction.go b/transaction_interface/ITransaction.go similarity index 71% rename from transaction/ITransaction.go rename to transaction_interface/ITransaction.go index 9c6d966..62b1df1 100644 --- a/transaction/ITransaction.go +++ b/transaction_interface/ITransaction.go @@ -1,4 +1,4 @@ -package transaction +package transaction_interface type TransactionInterface interface { Commit()