22
33import lombok .RequiredArgsConstructor ;
44import lombok .extern .slf4j .Slf4j ;
5- import org .lowcoder .domain .application .model .ApplicationHistorySnapshot ;
6- import org .lowcoder .domain .application .model .ApplicationHistorySnapshotTS ;
75import org .lowcoder .sdk .config .CommonConfig ;
86import org .springframework .data .mongodb .core .MongoTemplate ;
9- import org .springframework .data .mongodb .core .query .Criteria ;
10- import org .springframework .data .mongodb .core .query .Query ;
117import org .springframework .scheduling .annotation .Scheduled ;
128import org .springframework .stereotype .Component ;
139
1612import java .util .List ;
1713import java .util .concurrent .TimeUnit ;
1814
15+ import com .mongodb .client .MongoCollection ;
16+ import com .mongodb .client .MongoCursor ;
17+ import com .mongodb .client .model .Filters ;
18+ import org .bson .Document ;
19+
1920@ Slf4j
2021@ RequiredArgsConstructor
2122@ Component
@@ -24,23 +25,122 @@ public class ArchiveSnapshotTask {
2425 private final CommonConfig commonConfig ;
2526 private final MongoTemplate mongoTemplate ;
2627
27- @ Scheduled (initialDelay = 1 , fixedRate = 1 , timeUnit = TimeUnit .DAYS )
28+ @ Scheduled (initialDelay = 0 , fixedRate = 1 , timeUnit = TimeUnit .DAYS )
2829 public void archive () {
30+ int mongoVersion = getMongoDBVersion ();
2931 Instant thresholdDate = Instant .now ().minus (commonConfig .getQuery ().getAppSnapshotKeepDuration (), ChronoUnit .DAYS );
30- List <ApplicationHistorySnapshotTS > snapshots = mongoTemplate .find (new Query ().addCriteria (Criteria .where ("createdAt" ).lte (thresholdDate )), ApplicationHistorySnapshotTS .class );
31- snapshots .forEach (snapshot -> {
32- ApplicationHistorySnapshot applicationHistorySnapshot = new ApplicationHistorySnapshot ();
33- applicationHistorySnapshot .setApplicationId (snapshot .getApplicationId ());
34- applicationHistorySnapshot .setDsl (snapshot .getDsl ());
35- applicationHistorySnapshot .setContext (snapshot .getContext ());
36- applicationHistorySnapshot .setCreatedAt (snapshot .getCreatedAt ());
37- applicationHistorySnapshot .setCreatedBy (snapshot .getCreatedBy ());
38- applicationHistorySnapshot .setModifiedBy (snapshot .getModifiedBy ());
39- applicationHistorySnapshot .setUpdatedAt (snapshot .getUpdatedAt ());
40- applicationHistorySnapshot .setId (snapshot .getId ());
41- mongoTemplate .insert (applicationHistorySnapshot );
42- mongoTemplate .remove (snapshot );
43- });
32+
33+ if (mongoVersion >= 5 ) {
34+ archiveForVersion5AndAbove (thresholdDate );
35+ } else {
36+ archiveForVersionBelow5 (thresholdDate );
37+ }
38+ }
39+
40+ private int getMongoDBVersion () {
41+ Document buildInfo = mongoTemplate .getDb ().runCommand (new Document ("buildInfo" , 1 ));
42+ String version = buildInfo .getString ("version" );
43+ return Integer .parseInt (version .split ("\\ ." )[0 ]); // Parse major version
4444 }
4545
46+ private void archiveForVersion5AndAbove (Instant thresholdDate ) {
47+ log .info ("Running archival for MongoDB version >= 5" );
48+
49+ MongoCollection <Document > sourceCollection = mongoTemplate .getDb ().getCollection ("applicationHistorySnapshotTS" );
50+ MongoCollection <Document > targetCollection = mongoTemplate .getDb ().getCollection ("applicationHistorySnapshot" );
51+
52+ long totalDocuments = sourceCollection .countDocuments (Filters .lte ("createdAt" , thresholdDate ));
53+ log .info ("Total documents to archive: {}" , totalDocuments );
54+
55+ long processedCount = 0 ;
56+
57+ try (MongoCursor <Document > cursor = sourceCollection .find (Filters .lte ("createdAt" , thresholdDate )).iterator ()) {
58+ while (cursor .hasNext ()) {
59+ Document document = cursor .next ();
60+
61+ // Transform the document for the target collection
62+ document .put ("id" , document .getObjectId ("_id" )); // Map `_id` to `id`
63+ document .remove ("_id" );
64+
65+ // Insert the document into the target collection
66+ try {
67+ targetCollection .insertOne (document );
68+ } catch (Exception e ) {
69+ log .error ("Failed to insert document with ID {}. Error: {}" , document .getObjectId ("id" ), e .getMessage ());
70+ continue ;
71+ }
72+
73+ // Remove the document from the source collection
74+ try {
75+ sourceCollection .deleteOne (Filters .eq ("_id" , document .getObjectId ("id" )));
76+ } catch (Exception e ) {
77+ log .error ("Failed to delete document with ID {}. Error: {}" , document .getObjectId ("id" ), e .getMessage ());
78+ continue ;
79+ }
80+
81+ processedCount ++;
82+ log .info ("Processed document {} / {}" , processedCount , totalDocuments );
83+ }
84+ } catch (Exception e ) {
85+ log .error ("Failed during archival process. Error: {}" , e .getMessage ());
86+ }
87+
88+ log .info ("Archival process completed. Total documents archived: {}" , processedCount );
89+ }
90+
91+ private void archiveForVersionBelow5 (Instant thresholdDate ) {
92+ log .info ("Running archival for MongoDB version < 5" );
93+
94+ MongoCollection <Document > sourceCollection = mongoTemplate .getDb ().getCollection ("applicationHistorySnapshotTS" );
95+
96+ long totalDocuments = sourceCollection .countDocuments (Filters .lte ("createdAt" , thresholdDate ));
97+ log .info ("Total documents to archive: {}" , totalDocuments );
98+
99+ long processedCount = 0 ;
100+
101+ try (MongoCursor <Document > cursor = sourceCollection .find (Filters .lte ("createdAt" , thresholdDate )).iterator ()) {
102+ while (cursor .hasNext ()) {
103+ Document document = cursor .next ();
104+
105+ // Transform the document for the target collection
106+ document .put ("id" , document .getObjectId ("_id" )); // Map `_id` to `id`
107+ document .remove ("_id" );
108+
109+ // Use aggregation with $out for the single document
110+ try {
111+ sourceCollection .aggregate (List .of (
112+ Filters .eq ("_id" , document .getObjectId ("id" )),
113+ new Document ("$project" , new Document ()
114+ .append ("applicationId" , document .get ("applicationId" ))
115+ .append ("dsl" , document .get ("dsl" ))
116+ .append ("context" , document .get ("context" ))
117+ .append ("createdAt" , document .get ("createdAt" ))
118+ .append ("createdBy" , document .get ("createdBy" ))
119+ .append ("modifiedBy" , document .get ("modifiedBy" ))
120+ .append ("updatedAt" , document .get ("updatedAt" ))
121+ .append ("id" , document .get ("id" ))),
122+ new Document ("$out" , "applicationHistorySnapshot" )
123+ )).first ();
124+ } catch (Exception e ) {
125+ log .error ("Failed to aggregate and insert document with ID {}. Error: {}" , document .getObjectId ("id" ), e .getMessage ());
126+ continue ;
127+ }
128+
129+ // Remove the document from the source collection
130+ try {
131+ sourceCollection .deleteOne (Filters .eq ("_id" , document .getObjectId ("id" )));
132+ } catch (Exception e ) {
133+ log .error ("Failed to delete document with ID {}. Error: {}" , document .getObjectId ("id" ), e .getMessage ());
134+ continue ;
135+ }
136+
137+ processedCount ++;
138+ log .info ("Processed document {} / {}" , processedCount , totalDocuments );
139+ }
140+ } catch (Exception e ) {
141+ log .error ("Failed during archival process. Error: {}" , e .getMessage ());
142+ }
143+
144+ log .info ("Archival process completed. Total documents archived: {}" , processedCount );
145+ }
46146}
0 commit comments