diff --git a/concurrency/concurrencyMgr.go b/concurrency/concurrencyMgr.go new file mode 100644 index 0000000..616bf69 --- /dev/null +++ b/concurrency/concurrencyMgr.go @@ -0,0 +1,101 @@ +package concurrency + +import ( + "fmt" + "sync" + "ultraSQL/kfile" +) + +type ConcurrencyMgr struct { + lTble *LockTable + locks map[kfile.BlockId]string + mu sync.RWMutex // Protect shared map access +} + +func NewConcurrencyMgr() *ConcurrencyMgr { + return &ConcurrencyMgr{ + locks: make(map[kfile.BlockId]string), + } +} + +func (cM *ConcurrencyMgr) SLock(blk kfile.BlockId) error { + cM.mu.Lock() + defer cM.mu.Unlock() + + // If we already have any lock (S or X), no need to acquire again + if _, exists := cM.locks[blk]; exists { + return nil + } + + err := cM.lTble.sLock(blk) + if err != nil { + return fmt.Errorf("failed to acquire shared lock: %w", err) + } + + cM.locks[blk] = "S" + return nil +} + +func (cM *ConcurrencyMgr) XLock(blk kfile.BlockId) error { + cM.mu.Lock() + defer cM.mu.Unlock() + + // If we already have an X lock, no need to acquire again + if cM.hasXLock(blk) { + return nil + } + + // Following the two-phase locking protocol: + // 1. First acquire S lock if we don't have any lock + if _, exists := cM.locks[blk]; !exists { + err := cM.lTble.sLock(blk) + if err != nil { + return fmt.Errorf("failed to acquire initial shared lock: %w", err) + } + cM.locks[blk] = "S" + } + + // 2. Then upgrade to X lock + err := cM.lTble.xLock(blk) + if err != nil { + return fmt.Errorf("failed to upgrade to exclusive lock: %w", err) + } + + cM.locks[blk] = "X" + return nil +} + +func (cM *ConcurrencyMgr) Release() error { + cM.mu.Lock() + defer cM.mu.Unlock() + + var errs []error + for blk := range cM.locks { + if err := cM.lTble.unlock(blk); err != nil { + errs = append(errs, fmt.Errorf("failed to release lock for block %v: %w", blk, err)) + } + } + + // Clear the locks map regardless of errors + cM.locks = make(map[kfile.BlockId]string) + + if len(errs) > 0 { + return fmt.Errorf("errors during release: %v", errs) + } + return nil +} + +func (cM *ConcurrencyMgr) hasXLock(blk kfile.BlockId) bool { + // Note: Caller must hold mutex + lockType, ok := cM.locks[blk] + return ok && lockType == "X" +} + +// Helper method to check current lock status +func (cM *ConcurrencyMgr) GetLockType(blk kfile.BlockId) (string, bool) { + cM.mu.RLock() + defer cM.mu.RUnlock() + + lockType, exists := cM.locks[blk] + return lockType, exists +} diff --git a/concurrency/lockTable.go b/concurrency/lockTable.go new file mode 100644 index 0000000..a345d35 --- /dev/null +++ b/concurrency/lockTable.go @@ -0,0 +1,114 @@ +package concurrency + +import ( + "fmt" + "sync" + "time" + "ultraSQL/kfile" +) + +const MaxWaitTime = 10 * time.Second + +type LockTable struct { + locks map[kfile.BlockId]int // positive: number of shared locks, negative: exclusive lock + mu sync.RWMutex + cond *sync.Cond +} + +func NewLockTable() *LockTable { + lt := &LockTable{ + locks: make(map[kfile.BlockId]int), + } + lt.cond = sync.NewCond(<.mu) + return lt +} + +func (lT *LockTable) sLock(blk kfile.BlockId) error { + lT.mu.Lock() + defer lT.mu.Unlock() + + deadline := time.Now().Add(MaxWaitTime) + + // Wait while there's an exclusive lock on the block + for lT.hasXLock(blk) { + if time.Now().After(deadline) { + return fmt.Errorf("shared lock acquisition timed out for block %v", blk) + } + lT.cond.Wait() + } + + // Increment the number of shared locks (or initialize to 1) + val := lT.getLockVal(blk) + lT.locks[blk] = val + 1 + return nil +} + +func (lT *LockTable) xLock(blk kfile.BlockId) error { + lT.mu.Lock() + defer lT.mu.Unlock() + + deadline := time.Now().Add(MaxWaitTime) + + // Wait while there are other locks (shared or exclusive) + for lT.hasOtherLocks(blk) { + if time.Now().After(deadline) { + return fmt.Errorf("exclusive lock acquisition timed out for block %v", blk) + } + lT.cond.Wait() + } + + // Set to -1 to indicate exclusive lock + lT.locks[blk] = -1 + return nil +} + +func (lT *LockTable) hasXLock(blk kfile.BlockId) bool { + return lT.getLockVal(blk) < 0 +} + +func (lT *LockTable) getLockVal(blk kfile.BlockId) int { + val, exists := lT.locks[blk] + if !exists { + return 0 + } + return val +} + +func (lT *LockTable) hasOtherLocks(blk kfile.BlockId) bool { + val := lT.getLockVal(blk) + return val != 0 && val != 1 // Allow upgrade from single shared lock +} + +func (lT *LockTable) unlock(blk kfile.BlockId) error { + lT.mu.Lock() + defer lT.mu.Unlock() + + val := lT.getLockVal(blk) + if val == 0 { + return fmt.Errorf("attempting to unlock block %v which is not locked", blk) + } + + if val > 1 { + // Decrement shared lock count + lT.locks[blk] = val - 1 + } else { + // Remove last shared lock or exclusive lock + delete(lT.locks, blk) + lT.cond.Broadcast() // Wake up waiting goroutines + } + return nil +} + +// Helper method to get lock information +func (lT *LockTable) GetLockInfo(blk kfile.BlockId) (lockType string, count int) { + lT.mu.RLock() + defer lT.mu.RUnlock() + + val := lT.getLockVal(blk) + if val < 0 { + return "exclusive", 1 + } else if val > 0 { + return "shared", val + } + return "none", 0 +} diff --git a/kfile/file.go b/kfile/file.go index fc1171b..1619f21 100644 --- a/kfile/file.go +++ b/kfile/file.go @@ -23,7 +23,7 @@ func NewBlockId(filename string, blknum int) *BlockId { } } -func (b *BlockId) GetFileName() string { +func (b *BlockId) FileName() string { return b.Filename } diff --git a/kfile/fileMgr.go b/kfile/fileMgr.go index d8d65a6..5c52dc0 100644 --- a/kfile/fileMgr.go +++ b/kfile/fileMgr.go @@ -112,7 +112,7 @@ func (fm *FileMgr) PreallocateFile(blk *BlockId, size int64) error { return err } - filename := blk.GetFileName() + filename := blk.FileName() if err := fm.validatePermissions(); err != nil { return err } @@ -125,7 +125,7 @@ func (fm *FileMgr) validatePreallocationParams(blk *BlockId, size int64) error { if size%int64(fm.blocksize) != 0 { return fmt.Errorf("size must be a multiple of blocksize %d", fm.blocksize) } - if blk.GetFileName() == "" { + if blk.FileName() == "" { return fmt.Errorf("invalid filename") } return nil @@ -193,14 +193,14 @@ func (fm *FileMgr) Read(blk *BlockId, p *SlottedPage) error { fm.mutex.RLock() defer fm.mutex.RUnlock() - f, err := fm.getFile(blk.GetFileName()) + f, err := fm.getFile(blk.FileName()) if err != nil { return fmt.Errorf("failed to get file for block %v: %w", blk, err) } offset := int64(blk.Number() * fm.blocksize) if _, err = f.Seek(offset, io.SeekStart); err != nil { - return fmt.Errorf(seekErrFormat, offset, blk.GetFileName(), err) + return fmt.Errorf(seekErrFormat, offset, blk.FileName(), err) } bytesRead, err := f.Read(p.Contents()) if err != nil { @@ -224,14 +224,14 @@ func (fm *FileMgr) Write(blk *BlockId, p *SlottedPage) error { fm.mutex.Lock() defer fm.mutex.Unlock() - f, err := fm.getFile(blk.GetFileName()) + f, err := fm.getFile(blk.FileName()) if err != nil { return fmt.Errorf("failed to get file for block %v: %w", blk, err) } offset := int64(blk.Number() * fm.blocksize) if _, err = f.Seek(offset, io.SeekStart); err != nil { - return fmt.Errorf(seekErrFormat, offset, blk.GetFileName(), err) + return fmt.Errorf(seekErrFormat, offset, blk.FileName(), err) } bytesWritten, err := f.Write(p.Contents()) if err != nil { @@ -241,7 +241,7 @@ func (fm *FileMgr) Write(blk *BlockId, p *SlottedPage) error { return fmt.Errorf("incomplete write: expected %d bytes, wrote %d", fm.blocksize, bytesWritten) } if err = f.Sync(); err != nil { - return fmt.Errorf("failed to sync file %s: %w", blk.GetFileName(), err) + return fmt.Errorf("failed to sync file %s: %w", blk.FileName(), err) } fm.blocksWritten++ @@ -379,7 +379,7 @@ func (fm *FileMgr) WriteLog() []ReadWriteLogEntry { // ensureFileSize ensures the file has at least the required number of blocks. func (fm *FileMgr) ensureFileSize(blk *BlockId, requiredBlocks int) error { - currentBlocks, err := fm.Length(blk.GetFileName()) + currentBlocks, err := fm.Length(blk.FileName()) if err != nil { return err } @@ -399,7 +399,7 @@ func (fm *FileMgr) RenameFile(blk *BlockId, newFileName string) error { return fmt.Errorf("invalid new filename: %s", newFileName) } - oldFileName := blk.GetFileName() + oldFileName := blk.FileName() // Close the old file if it is open. fm.openFilesLock.Lock() diff --git a/kfile/file__dir_test.go b/kfile/file__dir_test.go index 183f93a..03121c5 100644 --- a/kfile/file__dir_test.go +++ b/kfile/file__dir_test.go @@ -46,8 +46,8 @@ func TestBlock(t *testing.T) { Blknum := 5 blk := NewBlockId(Filename, Blknum) - if blk.GetFileName() != Filename { - t.Errorf("Expected Filename %s, got %s", Filename, blk.GetFileName()) + if blk.FileName() != Filename { + t.Errorf("Expected Filename %s, got %s", Filename, blk.FileName()) } if blk.Number() != Blknum { @@ -126,13 +126,13 @@ func TestBlock(t *testing.T) { // Test NextBlock next := blk.NextBlock() - if next.Number() != 6 || next.GetFileName() != "test.db" { + if next.Number() != 6 || next.FileName() != "test.db" { t.Error("NextBlock returned incorrect block") } // Test PrevBlock prev := blk.PrevBlock() - if prev.Number() != 4 || prev.GetFileName() != "test.db" { + if prev.Number() != 4 || prev.FileName() != "test.db" { t.Error("PrevBlock returned incorrect block") } @@ -372,8 +372,8 @@ func TestBlockId(t *testing.T) { blknum := 5 blk := NewBlockId(filename, blknum) - if blk.GetFileName() != filename { - t.Errorf("Expected Filename %s, got %s", filename, blk.GetFileName()) + if blk.FileName() != filename { + t.Errorf("Expected Filename %s, got %s", filename, blk.FileName()) } if blk.Number() != blknum { @@ -448,12 +448,12 @@ func TestBlockId(t *testing.T) { blk := NewBlockId("test.db", 5) next := blk.NextBlock() - if next.Number() != 6 || next.GetFileName() != "test.db" { + if next.Number() != 6 || next.FileName() != "test.db" { t.Error("NextBlock returned incorrect block") } prev := blk.PrevBlock() - if prev.Number() != 4 || prev.GetFileName() != "test.db" { + if prev.Number() != 4 || prev.FileName() != "test.db" { t.Error("PrevBlock returned incorrect block") } @@ -779,7 +779,7 @@ func TestFileRename(t *testing.T) { t.Errorf("Could not rename file %s", err) } want := new_file - got := blk.GetFileName() + got := blk.FileName() if want != got { t.Errorf("want %s but got %s", want, got) } diff --git a/log/IRecord.go b/log/IRecord.go new file mode 100644 index 0000000..83b57ae --- /dev/null +++ b/log/IRecord.go @@ -0,0 +1,17 @@ +package log + +const ( + CHECKPOINT = iota + START + COMMIT + ROLLBACK + SETINT + SETSTRING +) + +type LogRecord interface { + Op() int + TxNumber() int + Undo(txNum int) + // Optionally: a method to serialize or convert to a Cell +} diff --git a/log/logmgr.go b/log/logmgr.go index 3076373..d4c3d81 100644 --- a/log/logmgr.go +++ b/log/logmgr.go @@ -69,7 +69,7 @@ func NewLogMgr(fm *kfile.FileMgr, bm *buffer.BufferMgr, logFile string) (*LogMgr return nil, &Error{Op: "new", Err: fmt.Errorf("failed to append initial block: %w", err)} } // Inform the buffer manager that this block is in use. - lm.bm.Policy.AllocateBufferForBlock(*lm.currentBlock) + lm.bm.Policy().AllocateBufferForBlock(*lm.currentBlock) } else { // Otherwise, set the current block as the last block. lm.currentBlock = kfile.NewBlockId(logFile, lm.logsize-1) @@ -164,7 +164,7 @@ func (lm *LogMgr) Append(logrec []byte) (int, []byte, error) { return 0, nil, &Error{Op: "append", Err: fmt.Errorf("failed to append new block: %w", err)} } // You may want to inform the buffer manager about the new block. - lm.bm.Policy.AllocateBufferForBlock(*lm.currentBlock) + lm.bm.Policy().AllocateBufferForBlock(*lm.currentBlock) // Try inserting again into the new log page. logPage = lm.logBuffer.Contents() if err = logPage.InsertCell(cell); err != nil { @@ -212,3 +212,7 @@ func (lm *LogMgr) ValidateKey(key []byte) bool { generatedKey := lm.GenerateKey() return bytes.Compare(key, generatedKey) == 0 } + +func (lm *LogMgr) Buffer() *buffer.Buffer { + return lm.logBuffer +} diff --git a/log_record/log_record.go b/log_record/log_record.go new file mode 100644 index 0000000..ed676b6 --- /dev/null +++ b/log_record/log_record.go @@ -0,0 +1,180 @@ +package log_record + +import ( + "bytes" + "encoding/binary" + "fmt" + "ultraSQL/kfile" + _ "ultraSQL/kfile" + "ultraSQL/log" + "ultraSQL/transaction" +) + +// Example op code if you're not using separate ones. +const UNIFIEDUPDATE = 100 + +type UnifiedUpdateRecord struct { + txNum int64 + blkFile string + blkNum int + key []byte + oldBytes []byte + newBytes []byte +} + +func WriteUnifiedUpdateLogRecord( + lm *log.LogMgr, + txNum int64, + blk *kfile.BlockId, + key []byte, + oldBytes []byte, + newBytes []byte, +) int { + // Implementation up to you; typically you’d: + // 1) Create a record structure (e.g. UnifiedUpdateRecord). + // 2) Serialize txNum, block info, slotIndex, oldBytes, newBytes. + // 3) Append to log manager, returning the LSN. + return 0 // placeholder +} + +// Ensure it satisfies the LogRecord transaction_interface +func (rec *UnifiedUpdateRecord) Op() int { + return UNIFIEDUPDATE +} +func (rec *UnifiedUpdateRecord) TxNumber() int64 { + return rec.txNum +} + +// Undo reverts the page/slot to oldBytes +func (rec *UnifiedUpdateRecord) Undo(tx *transaction.TransactionMgr) { + // 1) Pin or fetch the buffer for rec.blkFile, rec.blkNum + // 2) Cast to SlottedPage + // 3) Overwrite the cell at rec.slotIndex with oldBytes + fmt.Printf("Undoing unified update: restoring old cell bytes for tx=%d slot=%d\n", rec.txNum, rec.slotIndex) + // ... actual code ... +} + +// Serialize the record to bytes +func (rec *UnifiedUpdateRecord) ToBytes() []byte { + buf := new(bytes.Buffer) + // 1. Op code + _ = binary.Write(buf, binary.BigEndian, int32(UNIFIEDUPDATE)) + // 2. txNum + _ = binary.Write(buf, binary.BigEndian, rec.txNum) + + // 3. block info + writeString(buf, rec.blkFile) + _ = binary.Write(buf, binary.BigEndian, int32(rec.blkNum)) + _ = binary.Write(buf, binary.BigEndian, int32(rec.slotIndex)) + + // 4. oldBytes + writeBytes(buf, rec.oldBytes) + // 5. newBytes + writeBytes(buf, rec.newBytes) + return buf.Bytes() +} + +// parse UnifiedUpdateRecord from bytes +func FromBytesUnifiedUpdate(data []byte) (*UnifiedUpdateRecord, error) { + buf := bytes.NewReader(data) + + var op int32 + if err := binary.Read(buf, binary.BigEndian, &op); err != nil { + return nil, err + } + if op != UNIFIEDUPDATE { + return nil, fmt.Errorf("not a unified update record") + } + + var txNum int64 + if err := binary.Read(buf, binary.BigEndian, &txNum); err != nil { + return nil, err + } + + blkFile, err := readString(buf) + if err != nil { + return nil, err + } + + var blkNum int32 + if err := binary.Read(buf, binary.BigEndian, &blkNum); err != nil { + return nil, err + } + var slotIndex int32 + if err := binary.Read(buf, binary.BigEndian, &slotIndex); err != nil { + return nil, err + } + + oldBytes, err := readBytes(buf) + if err != nil { + return nil, err + } + newBytes, err := readBytes(buf) + if err != nil { + return nil, err + } + + return &UnifiedUpdateRecord{ + txNum: txNum, + blkFile: blkFile, + blkNum: int(blkNum), + slotIndex: int(slotIndex), + oldBytes: oldBytes, + newBytes: newBytes, + }, nil +} + +// Helpers for writing/reading strings/bytes: + +func writeString(buf *bytes.Buffer, s string) { + writeBytes(buf, []byte(s)) +} +func readString(buf *bytes.Reader) (string, error) { + b, err := readBytes(buf) + if err != nil { + return "", err + } + return string(b), nil +} +func writeBytes(buf *bytes.Buffer, data []byte) { + _ = binary.Write(buf, binary.BigEndian, int32(len(data))) + buf.Write(data) +} +func readBytes(buf *bytes.Reader) ([]byte, error) { + var length int32 + if err := binary.Read(buf, binary.BigEndian, &length); err != nil { + return nil, err + } + if length < 0 { + return nil, fmt.Errorf("negative length") + } + b := make([]byte, length) + n, err := buf.Read(b) + if err != nil || n != int(length) { + return nil, fmt.Errorf("failed to read bytes") + } + return b, nil +} + +func CreateLogRecord(data []byte) log.LogRecord { + // Peek at op code + if len(data) < 4 { + return nil + } + op := int32(binary.BigEndian.Uint32(data[0:4])) + switch op { + case log.CHECKPOINT: + return NewCheckpointRecordFromBytes(data) + case log.START: + return NewStartRecordFromBytes(data) + case log.COMMIT: + return NewCommitRecordFromBytes(data) + case log.ROLLBACK: + return NewRollbackRecordFromBytes(data) + case UNIFIEDUPDATE: + rec, _ := FromBytesUnifiedUpdate(data) + return rec + default: + return nil + } +} diff --git a/recovery/recoveryMgr.go b/recovery/recoveryMgr.go new file mode 100644 index 0000000..58b91ec --- /dev/null +++ b/recovery/recoveryMgr.go @@ -0,0 +1,159 @@ +package recovery + +import ( + "fmt" + "ultraSQL/buffer" + "ultraSQL/log" + "ultraSQL/log_record" + "ultraSQL/transaction" +) + +// RecoveryMgr manages the logging and recovery for a given transaction. +type RecoveryMgr struct { + lm *log.LogMgr + bm *buffer.BufferMgr + tx *transaction.TransactionMgr + txNum int64 +} + +// NewRecoveryMgr is analogous to the Java constructor: +// +// public RecoveryMgr(Transaction tx, int txnum, LogMgr lm, BufferMgr bm) +func NewRecoveryMgr(tx *transaction.TransactionMgr, txNum int64, lm *log.LogMgr, bm *buffer.BufferMgr) *RecoveryMgr { + rm := &RecoveryMgr{ + tx: tx, + txNum: txNum, + lm: lm, + bm: bm, + } + // Write a START record to the log + StartRecordWriteToLog(lm, txNum) // e.g., StartRecord.writeToLog(lm, txNum) + return rm +} + +// Commit is analogous to public void commit() +func (r *RecoveryMgr) Commit() { + // 1. Flush all buffers associated with this transaction + r.bm.Policy().FlushAll(r.txNum) + + // 2. Write COMMIT record to the log + lsn := CommitRecordWriteToLog(r.lm, r.txNum) // e.g., CommitRecord.writeToLog(r.lm, r.txNum) + + // 3. Force the log up to that LSN + flushErr := r.lm.Buffer().FlushLSN(lsn) + if flushErr != nil { + fmt.Printf("error occurred during commit flush: %v\n", flushErr) + } +} + +// Rollback is analogous to public void rollback() +func (r *RecoveryMgr) Rollback() { + r.doRollback() + r.bm.Policy().FlushAll(r.txNum) + lsn := RollbackRecordWriteToLog(r.lm, r.txNum) + flushErr := r.lm.Buffer().FlushLSN(lsn) + if flushErr != nil { + fmt.Printf("error occurred during rollback flush: %v\n", flushErr) + } +} + +// Recover is analogous to public void recover() +func (r *RecoveryMgr) Recover() { + r.doRecover() + r.bm.Policy().FlushAll(r.txNum) + lsn := CheckpointRecordWriteToLog(r.lm) // e.g., CheckpointRecord.writeToLog(lm) + flushErr := r.lm.Buffer().FlushLSN(lsn) + if flushErr != nil { + fmt.Printf("error occurred during recovery flush: %v\n", flushErr) + } +} + +// SetCellValue updates the cell in a slotted page, then writes a unified log record +// that stores the old/new serialized cell bytes for undo/redo. +func (r *RecoveryMgr) SetCellValue(buff *buffer.Buffer, key []byte, newVal any) (int, error) { + // 1. Get the slotted page from the buffer. + sp := buff.Contents() + + // 2. Retrieve the cell at the given slot. + cell, _, err := sp.FindCell(key) + if err != nil { + return -1, fmt.Errorf("failed to get cell at slot %d: %w", key, err) + } + + // 3. Serialize the current (old) cell state. + oldBytes := cell.ToBytes() + + // 4. Update the cell with the new value (the cell handles type encoding). + if err := cell.SetValue(newVal); err != nil { + return -1, fmt.Errorf("failed to set cell value: %w", err) + } + + // 5. Serialize the new cell state. + newBytes := cell.ToBytes() + + // 6. Write a unified update record to the log: includes txNum, block ID, slotIndex, oldBytes, newBytes. + blk := buff.Block() // or any *BlockId if your Buffer returns it + lsn := log_record.WriteUnifiedUpdateLogRecord(r.lm, r.txNum, blk, key, oldBytes, newBytes) + + // 7. Return the LSN so the caller can handle further flush or keep track of it. + return lsn, nil +} + +// doRollback performs a backward scan of the log to undo any record belonging to this transaction. +func (r *RecoveryMgr) doRollback() { + iter, err := r.lm.Iterator() + if err != nil { + fmt.Printf("error occurred creating log iterator: %v\n", err) + return + } + for iter.HasNext() { + data, err := iter.Next() + if err != nil { + fmt.Printf("error occurred reading next log record: %v\n", err) + return + } + rec := CreateLogRecord(data) // e.g. UnifiedUpdateRecord or other record + if rec == nil { + continue + } + if rec.TxNumber() == r.txNum { + if rec.Op() == START { + // Once we reach the START record for our transaction, we stop + return + } + rec.Undo(r.tx) // "Undo" is record-specific logic + } + } +} + +// doRecover replays the log from the end, undoing updates for transactions that never committed. +func (r *RecoveryMgr) doRecover() { + finishedTxs := make(map[int64]bool) + + iter, err := r.lm.Iterator() + if err != nil { + fmt.Printf("error occurred creating log iterator: %v\n", err) + return + } + for iter.HasNext() { + data, err := iter.Next() + if err != nil { + fmt.Printf("error occurred reading next log record: %v\n", err) + return + } + rec := CreateLogRecord(data) + if rec == nil { + continue + } + switch rec.Op() { + case CHECKPOINT: + return + case COMMIT, ROLLBACK: + finishedTxs[rec.TxNumber()] = true + default: + if !finishedTxs[rec.TxNumber()] { + rec.Undo(r.tx) + } + } + } +} diff --git a/transaction/bufferlist.go b/transaction/bufferlist.go index c1fd2a7..0fe77e2 100644 --- a/transaction/bufferlist.go +++ b/transaction/bufferlist.go @@ -1,10 +1,59 @@ package transaction -import "ultraSQL/buffer" +import ( + "fmt" + "ultraSQL/buffer" + "ultraSQL/kfile" +) type BufferList struct { + bm *buffer.BufferMgr + buffers map[kfile.BlockId]*buffer.Buffer } -func NewBufferList(*buffer.BufferMgr) *BufferList { - return &BufferList{} +func NewBufferList(bm *buffer.BufferMgr) *BufferList { + return &BufferList{ + bm: bm, + buffers: make(map[kfile.BlockId]*buffer.Buffer), + } +} + +// Buffer retrieves a pinned Buffer (if any) for the given block +func (bl *BufferList) Buffer(blk kfile.BlockId) *buffer.Buffer { + return bl.buffers[blk] +} + +// Pin pins the specified block if it isn't already pinned in this BufferList +func (bl *BufferList) Pin(blk kfile.BlockId) error { + if _, exists := bl.buffers[blk]; exists { + // already pinned in this transaction + return nil + } + buff, err := bl.bm.Pin(&blk) + if err != nil { + return fmt.Errorf("failed to pin block %v: %w", blk, err) + } + bl.buffers[blk] = buff + return nil +} + +// Unpin unpins the specified block +func (bl *BufferList) Unpin(blk kfile.BlockId) error { + buff, exists := bl.buffers[blk] + if !exists { + // not pinned in this transaction + return nil + } + bl.bm.Unpin(buff) + delete(bl.buffers, blk) + return nil +} + +// UnpinAll unpins all blocks pinned by this BufferList +func (bl *BufferList) UnpinAll() { + for _, buff := range bl.buffers { + bl.bm.Unpin(buff) + } + // reset map + bl.buffers = make(map[kfile.BlockId]*buffer.Buffer) } diff --git a/transaction/concurrencyMgr.go b/transaction/concurrencyMgr.go deleted file mode 100644 index ed6b716..0000000 --- a/transaction/concurrencyMgr.go +++ /dev/null @@ -1,8 +0,0 @@ -package transaction - -type ConcurrencyMgr struct { -} - -func NewConcurrencyMgr() *ConcurrencyMgr { - return &ConcurrencyMgr{} -} diff --git a/transaction/recoveryMgr.go b/transaction/recoveryMgr.go deleted file mode 100644 index 9f92640..0000000 --- a/transaction/recoveryMgr.go +++ /dev/null @@ -1,17 +0,0 @@ -package transaction - -import ( - "ultraSQL/buffer" - "ultraSQL/log" -) - -type RecoveryMgr struct { -} - -func NewRecoveryMgr(txMgr *TransactionMgr, txtnum int64, lm *log.LogMgr, bm *buffer.BufferMgr) *RecoveryMgr { - return &RecoveryMgr{} -} - -func (r *RecoveryMgr) Commit() { - -} diff --git a/transaction/transactionMgr.go b/transaction/transactionMgr.go index c37389a..dac17be 100644 --- a/transaction/transactionMgr.go +++ b/transaction/transactionMgr.go @@ -1,17 +1,20 @@ package transaction import ( + "fmt" "sync/atomic" "ultraSQL/buffer" + "ultraSQL/concurrency" "ultraSQL/kfile" "ultraSQL/log" + "ultraSQL/recovery" ) type TransactionMgr struct { nextTxNum int64 EndOfFile int - rm *RecoveryMgr - cm *ConcurrencyMgr + rm *recovery.RecoveryMgr + cm *concurrency.ConcurrencyMgr bm *buffer.BufferMgr fm *kfile.FileMgr txtnum int64 @@ -20,12 +23,12 @@ type TransactionMgr struct { func NewTransaction(fm *kfile.FileMgr, lm *log.LogMgr, bm *buffer.BufferMgr) *TransactionMgr { tx := &TransactionMgr{ - fm: fm, - bm: bm, - txtnum: nextTxNumber(), + fm: fm, + bm: bm, } - tx.rm = NewRecoveryMgr(tx, tx.txtnum, lm, bm) - tx.cm = NewConcurrencyMgr() + tx.nextTxNum = tx.nextTxNumber() + tx.rm = recovery.NewRecoveryMgr(tx, tx.txtnum, lm, bm) + tx.cm = concurrency.NewConcurrencyMgr() tx.bufferList = NewBufferList(bm) return tx } @@ -33,7 +36,7 @@ func NewTransaction(fm *kfile.FileMgr, lm *log.LogMgr, bm *buffer.BufferMgr) *Tr func (t *TransactionMgr) Commit() { t.rm.Commit() t.cm.Release() - t.bufferlist.UnpinAll() + t.bufferList.UnpinAll() } func (t *TransactionMgr) Rollback() { @@ -44,38 +47,85 @@ func (t *TransactionMgr) Rollback() { func (t *TransactionMgr) Recover() { t.bm.Policy().FlushAll(t.txtnum) - t.recoveryMgr.recover() + t.rm.Recover() } -func (t *TransactionMgr) Pin(blk *kfile.BlockId) { - bufferList.Pin(blk) +func (t *TransactionMgr) Pin(blk kfile.BlockId) error { + err := t.bufferList.Pin(blk) + if err != nil { + return fmt.Errorf("failed to pin block %v: %w", blk, err) + } + return nil } -func (t *TransactionMgr) UnPin(blk *kfile.BlockId) { - bufferList.UnPin(blk) +func (t *TransactionMgr) UnPin(blk kfile.BlockId) error { + err := t.bufferList.Unpin(blk) + if err != nil { + return fmt.Errorf("failed to pin block %v: %w", blk, err) + } + return nil } -func (t *TransactionMgr) Size(filename string)int { - dummyblk := kfile.NewBlockId(filename, t.EndOfFile); - t.cm.sLock(dummyblk); - fileLength,err := t.fm.LengthLocked(filename); - if err != nil{ - return 0 +func (t *TransactionMgr) Size(filename string) (int, error) { + dummyblk := kfile.NewBlockId(filename, t.EndOfFile) + err := t.cm.SLock(*dummyblk) + if err != nil { + return 0, fmt.Errorf("an error occured when acquiring lock %s", err) + } + fileLength, err := t.fm.LengthLocked(filename) + if err != nil { + return 0, fmt.Errorf("an error occured when acquiring file length %s", err) } - return fileLength; + return fileLength, nil } func (t *TransactionMgr) append(filename string) *kfile.BlockId { - BlockId dummyblk = new BlockId(filename, END_OF_FILE); - concurMgr.xLock(dummyblk); - return fm.append(filename); + dummyblk := kfile.NewBlockId(filename, t.EndOfFile) + t.cm.XLock(*dummyblk) + blk, err := t.fm.Append(filename) + if err != nil { + return nil + } + return blk } -func (t *TransactionMgr) blockSize() int { - return t.fm.BlockSize(); +func (t *TransactionMgr) blockSize() int { + return t.fm.BlockSize() } -func (t *TransactionMgr) AvailableBuffs() int { - return t.bm.Available(); +func (t *TransactionMgr) AvailableBuffs() int { + return t.bm.Available() } func (t *TransactionMgr) nextTxNumber() int64 { return atomic.AddInt64(&t.nextTxNum, 1) -} \ No newline at end of file +} + +func (t *TransactionMgr) FindCell(blk kfile.BlockId, key []byte) *kfile.Cell { + t.cm.SLock(blk) + buff := t.bufferList.Buffer(blk) + cell, _, err := buff.Contents().FindCell(key) + if err != nil { + return nil + } + return cell +} + +func (t *TransactionMgr) InsertCell(blk kfile.BlockId, key []byte, val any, okToLog bool) error { + t.cm.XLock(blk) + buff := t.bufferList.Buffer(blk) + lsn := -1 + var err error + if okToLog { + lsn, err = t.rm.SetCellValue(buff, key, val) + if err != nil { + return nil + } + } + cellKey := key + cell := kfile.NewKVCell(cellKey) + p := buff.Contents() + err = p.InsertCell(cell) + if err != nil { + return fmt.Errorf("failed to pin block %v: %w", blk, err) + } + buff.MarkModified(t.txtnum, lsn) + return nil +} diff --git a/transaction/ITransaction.go b/transaction_interface/ITransaction.go similarity index 71% rename from transaction/ITransaction.go rename to transaction_interface/ITransaction.go index 9c6d966..62b1df1 100644 --- a/transaction/ITransaction.go +++ b/transaction_interface/ITransaction.go @@ -1,4 +1,4 @@ -package transaction +package transaction_interface type TransactionInterface interface { Commit() diff --git a/utils/LogIterator.go b/utils/LogIterator.go index cded21e..418ab9e 100644 --- a/utils/LogIterator.go +++ b/utils/LogIterator.go @@ -43,7 +43,7 @@ func (it *LogIterator) Next() ([]byte, error) { // strictly speaking, we have no next record return nil, fmt.Errorf("no more records in block 0") } - newBlk := kfile.NewBlockId(it.blk.GetFileName(), it.blk.Number()-1) + newBlk := kfile.NewBlockId(it.blk.FileName(), it.blk.Number()-1) if err := it.moveToBlock(newBlk); err != nil { return nil, err }