diff --git a/packages/core/lib/utils/store/io.dart b/packages/core/lib/utils/store/io.dart index 91810f4..3f57ec7 100644 --- a/packages/core/lib/utils/store/io.dart +++ b/packages/core/lib/utils/store/io.dart @@ -11,14 +11,30 @@ import 'package:path_provider/path_provider.dart'; class StoreImpl with Store { final bool storageJson; late final Future _migrationCompleted; - + + // Queue size limits (bytes) + static const int _kMaxQueueBytes = 512 * 1024; // 512 KB + static const int _kTargetQueueBytes = 475 * 1024; // target after trimming 475 KB + + // The fileKey used by the queue/flushing plugin in this repo. + // The file produced will be analytics-flutter-queue_flushing_plugin.json + static const String _kQueueFileKey = 'queue_flushing_plugin'; + + // The field name inside the JSON payload that holds events (adjusted per user's example) + // The user's queue JSON uses a top-level "queue" array. + static const String _kQueueField = 'queue'; + StoreImpl({this.storageJson = true}) { // Start migration immediately but don't block construction _migrationCompleted = _migrateFilesFromDocumentsToSupport(); } + @override Future get ready => Future.value(); + @override + void dispose() {} + @override Future?> getPersisted(String key) async { if (!storageJson) return Future.value(null); @@ -34,7 +50,7 @@ class StoreImpl with Store { await _migrationCompleted; return _writeFile(key, value); } - + @override Future deletePersisted(String key) async { if (!storageJson) return; @@ -45,17 +61,187 @@ class StoreImpl with Store { } Future _writeFile(String fileKey, Map data) async { - RandomAccessFile file = - await _getFile(fileKey, create: true) as RandomAccessFile; - final serialized = json.encode(data); - final buffer = utf8.encode(serialized); - - file.lockSync(FileLock.blockingExclusive); - file.setPositionSync(0); - file.writeFromSync(buffer); - file.truncateSync(buffer.length); - file.unlockSync(); - file.closeSync(); + // Serialize once; may be replaced by trimmed version below + String serialized = json.encode(data); + List buffer = utf8.encode(serialized); + + // If this is the queue file, enforce size limits by trimming oldest events + if (fileKey == _kQueueFileKey) { + // Ensure we operate against the configured queue field name. + final currentEvents = []; + if (data[_kQueueField] is List) { + currentEvents.addAll(data[_kQueueField] as List); + } + + // If the serialized payload is already too large, attempt trim before writing + if (buffer.length > _kMaxQueueBytes) { + final trimmed = _trimQueueToTargetMap(data, _kTargetQueueBytes, _kQueueField); + serialized = json.encode(trimmed); + buffer = utf8.encode(serialized); + + // If still too big after trimming to target, write empty queue as last resort + if (buffer.length > _kMaxQueueBytes) { + serialized = json.encode({_kQueueField: []}); + buffer = utf8.encode(serialized); + } + } + } + + RandomAccessFile? file; + try { + file = await _getFile(fileKey, create: true) as RandomAccessFile; + // Acquire exclusive lock and write + file.lockSync(FileLock.blockingExclusive); + try { + file.setPositionSync(0); + file.writeFromSync(buffer); + file.truncateSync(buffer.length); + } finally { + // Ensure unlock and close + try { + file.unlockSync(); + } catch (_) {} + try { + file.closeSync(); + } catch (_) {} + } + return; + } on FileSystemException catch (_) { + // Recovery path for disk full / write errors for queue file + if (fileKey != _kQueueFileKey) { + rethrow; + } + + // Try reading existing stored events and aggressively trim until we can write + try { + final existing = await _readFile(fileKey); + List events = + (existing != null && existing[_kQueueField] is List) + ? List.from(existing[_kQueueField] as List) + : []; + + // Try progressively trimming and writing, removing oldest events first + while (true) { + // Build candidate payload + final candidateMap = {_kQueueField: events}; + final candidateText = json.encode(candidateMap); + final candidateBuffer = utf8.encode(candidateText); + + if (candidateBuffer.length <= _kMaxQueueBytes) { + // Attempt to write this candidate + RandomAccessFile? f; + try { + f = await _getFile(fileKey, create: true) as RandomAccessFile; + f.lockSync(FileLock.blockingExclusive); + try { + f.setPositionSync(0); + f.writeFromSync(candidateBuffer); + f.truncateSync(candidateBuffer.length); + // success + try { + f.unlockSync(); + } catch (_) {} + try { + f.closeSync(); + } catch (_) {} + return; + } finally { + // Ensure unlock/close in case of failures inside try + try { + f.unlockSync(); + } catch (_) {} + try { + f.closeSync(); + } catch (_) {} + } + } on FileSystemException { + // Couldn't write; fall through to trimming more events + } + } + + // If no events left, try to write an empty queue + if (events.isEmpty) { + final emptyBuf = utf8.encode(json.encode({_kQueueField: []})); + try { + RandomAccessFile? f2 = + await _getFile(fileKey, create: true) as RandomAccessFile; + f2.lockSync(FileLock.blockingExclusive); + try { + f2.setPositionSync(0); + f2.writeFromSync(emptyBuf); + f2.truncateSync(emptyBuf.length); + try { + f2.unlockSync(); + } catch (_) {} + try { + f2.closeSync(); + } catch (_) {} + return; + } finally { + try { + f2.unlockSync(); + } catch (_) {} + try { + f2.closeSync(); + } catch (_) {} + } + } on FileSystemException { + // rethrow original error + rethrow; + } + } + + // Remove the oldest event and try again + events.removeAt(0); + } + } catch (e) { + // If recovery fails, rethrow the original exception + rethrow; + } + } finally { + // Ensure any opened file is closed/unlocked (defensive) + try { + // Use null-aware calls to avoid calling on null. + file?.unlockSync(); + } catch (_) {} + try { + file?.closeSync(); + } catch (_) {} + } + } + + /// Trim the queue in [data] (expected to be a map containing an array under [queueField]) + /// until the serialized size is <= targetBytes. Returns a new Map containing + /// the trimmed queue list. + Map _trimQueueToTargetMap( + Map data, int targetBytes, String queueField) { + final events = []; + if (data[queueField] is List) { + events.addAll(data[queueField] as List); + } + + // If no events or not a list, return minimal representation + if (events.isEmpty) { + return {queueField: []}; + } + + // Fast path: if current serialized size is already small enough, return input + var candidate = {queueField: events}; + var s = json.encode(candidate); + var b = utf8.encode(s); + if (b.length <= targetBytes) { + return candidate; + } + + // Iteratively remove oldest events until serialized size <= targetBytes or no events left + while (b.length > targetBytes && events.isNotEmpty) { + events.removeAt(0); + candidate = {queueField: events}; + s = json.encode(candidate); + b = utf8.encode(s); + } + + return {queueField: events}; } Future?> _readFile(String fileKey) async { @@ -63,19 +249,31 @@ class StoreImpl with Store { if (file == null) { return null; } - file = await file.lock(FileLock.blockingShared); - final length = file.lengthSync(); - file.setPositionSync(0); - final buffer = Uint8List(length); - file.readIntoSync(buffer); - file.unlockSync(); - file.closeSync(); - final contentText = utf8.decode(buffer); - if (contentText == "{}") { - return null; // Prefer null to empty map, because we'll want to initialise a valid empty value. - } - return json.decode(contentText) as Map; + try { + file = await file.lock(FileLock.blockingShared); + final length = file.lengthSync(); + file.setPositionSync(0); + final buffer = Uint8List(length); + file.readIntoSync(buffer); + file.unlockSync(); + file.closeSync(); + final contentText = utf8.decode(buffer); + if (contentText == "{}") { + return null; // empty file + } + + return json.decode(contentText) as Map; + } on FileSystemException { + // Can't read the file -> return null for safety + try { + file?.unlockSync(); + } catch (_) {} + try { + file?.closeSync(); + } catch (_) {} + return null; + } } Future _fileName(String fileKey) async { @@ -88,6 +286,7 @@ class StoreImpl with Store { final file = File(await _fileName(fileKey)); if (await file.exists()) { + // Open in append mode so we can lock and then write/truncate return await file.open(mode: FileMode.append); } else if (create) { await file.create(recursive: true); @@ -113,44 +312,62 @@ class StoreImpl with Store { } } - /// Migrates existing analytics files from Documents directory to Application Support directory + /// Move any analytics-flutter-*.json files from Documents (old location) to + /// ApplicationSupport (new location). This migration is best-effort and will + /// ignore errors so SDK initialization can proceed. Future _migrateFilesFromDocumentsToSupport() async { + if (!storageJson) return; try { final oldDir = await _getOldDocumentDir(); final newDir = await _getNewDocumentDir(); - - // List all analytics files in the old directory - final oldDirFiles = oldDir.listSync() - .whereType() - .where((file) => file.path.contains('analytics-flutter-') && file.path.endsWith('.json')) - .toList(); - - for (final oldFile in oldDirFiles) { - final fileName = oldFile.path.split('/').last; - final newFilePath = '${newDir.path}/$fileName'; - final newFile = File(newFilePath); - - // Only migrate if the file doesn't already exist in the new location - if (!await newFile.exists()) { - try { - // Ensure the new directory exists - await newDir.create(recursive: true); - - // Copy the file to the new location - await oldFile.copy(newFilePath); - - // Delete the old file after successful copy - await oldFile.delete(); - } catch (e) { - // The app should continue to work even if migration fails + + // If same path, nothing to do + if (oldDir.path == newDir.path) return; + + // Ensure new dir exists + try { + if (!await Directory(newDir.path).exists()) { + await Directory(newDir.path).create(recursive: true); + } + } catch (_) {} + + // List files in old directory and move ones that match analytics-flutter-*.json + final oldDirectory = Directory(oldDir.path); + if (!await oldDirectory.exists()) return; + + await for (final entity in oldDirectory.list()) { + if (entity is File) { + final name = entity.uri.pathSegments.isNotEmpty + ? entity.uri.pathSegments.last + : ''; + if (name.startsWith('analytics-flutter-') && + name.endsWith('.json')) { + final destPath = '${newDir.path}/$name'; + final destFile = File(destPath); + + try { + // If destination already exists, skip or optionally merge - we skip here. + if (!await destFile.exists()) { + await entity.rename(destPath); + } else { + // If a file already exists at the destination, attempt to remove the old file. + try { + await entity.delete(); + } catch (_) {} + } + } catch (_) { + // Try fallback: copy then delete + try { + await entity.copy(destPath); + await entity.delete(); + } catch (_) { + } + } } } } - } catch (e) { - // Migration failure shouldn't break the app + } catch (_) { + return; } } - - @override - void dispose() {} -} +} \ No newline at end of file