Skip to content

Commit 8a495e3

Browse files
committed
add lock command
1 parent 4194bb0 commit 8a495e3

File tree

4 files changed

+277
-0
lines changed

4 files changed

+277
-0
lines changed

crates/core/src/commands.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ pub mod dump;
1212
pub mod forget;
1313
pub mod init;
1414
pub mod key;
15+
pub mod lock;
1516
pub mod merge;
1617
pub mod prune;
1718
/// The `repair` command.

crates/core/src/commands/lock.rs

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
//! `lock` subcommand
2+
use std::collections::BTreeSet;
3+
4+
use chrono::{DateTime, Local};
5+
use derive_setters::Setters;
6+
use log::error;
7+
use rayon::ThreadPoolBuilder;
8+
9+
use crate::{
10+
backend::{
11+
decrypt::{DecryptReadBackend, DecryptWriteBackend},
12+
node::NodeType,
13+
FileType,
14+
},
15+
blob::{tree::TreeStreamerOnce, BlobType},
16+
error::{CommandErrorKind, RepositoryErrorKind, RusticResult},
17+
id::Id,
18+
index::{
19+
binarysorted::{IndexCollector, IndexType},
20+
indexer::Indexer,
21+
GlobalIndex, ReadGlobalIndex,
22+
},
23+
progress::{Progress, ProgressBars},
24+
repofile::{indexfile::LockOption, DeleteOption, IndexFile, SnapshotFile},
25+
repository::{Open, Repository},
26+
};
27+
28+
pub(super) mod constants {
29+
/// The maximum number of reader threads to use for locking.
30+
pub(super) const MAX_LOCKER_THREADS_NUM: usize = 20;
31+
}
32+
33+
#[cfg_attr(feature = "clap", derive(clap::Parser))]
34+
#[derive(Debug, Clone, Copy, Setters)]
35+
/// Options for the `prune` command
36+
pub struct LockOptions {
37+
/// Extend locks even if the files are already locked long enough
38+
#[cfg_attr(feature = "clap", clap(long))]
39+
always_extend_lock: bool,
40+
41+
until: Option<DateTime<Local>>,
42+
}
43+
44+
impl LockOptions {
45+
pub fn lock<P: ProgressBars, S: Open>(
46+
&self,
47+
repo: &Repository<P, S>,
48+
snapshots: &[SnapshotFile],
49+
now: DateTime<Local>,
50+
) -> RusticResult<()> {
51+
let pb = &repo.pb;
52+
let be = repo.dbe();
53+
54+
let mut index_files = Vec::new();
55+
56+
let p = pb.progress_counter("reading index...");
57+
let mut index_collector = IndexCollector::new(IndexType::OnlyTrees);
58+
59+
for index in be.stream_all::<IndexFile>(&p)? {
60+
let (id, index) = index?;
61+
index_collector.extend(index.packs.clone());
62+
index_files.push((id, index));
63+
}
64+
p.finish();
65+
let index = GlobalIndex::new_from_index(index_collector.into_index());
66+
67+
let snap_tress = snapshots.iter().map(|sn| sn.tree).collect();
68+
let packs = find_needed_packs(be, &index, snap_tress, pb)?;
69+
self.lock_packs(repo, index_files, packs)?;
70+
71+
self.lock_snapshots(repo, snapshots, now)?;
72+
73+
Ok(())
74+
}
75+
76+
pub fn lock_snapshots<P: ProgressBars, S: Open>(
77+
&self,
78+
repo: &Repository<P, S>,
79+
snapshots: &[SnapshotFile],
80+
now: DateTime<Local>,
81+
) -> RusticResult<()> {
82+
let mut new_snaps = Vec::new();
83+
let mut remove_snaps = Vec::new();
84+
let mut lock_snaps = Vec::new();
85+
86+
let new_lock = DeleteOption::set_from_until(self.until);
87+
88+
for snap in snapshots {
89+
if !snap.must_keep(now) {
90+
remove_snaps.push(snap.id);
91+
}
92+
93+
if snap.delete.needs_lock_update(&new_lock) {
94+
new_snaps.push(SnapshotFile {
95+
delete: new_lock,
96+
..snap.clone()
97+
});
98+
} else if self.always_extend_lock {
99+
lock_snaps.push(snap.id);
100+
}
101+
}
102+
103+
// save new snapshots
104+
let new_ids = repo.save_snapshots(new_snaps)?;
105+
lock_snaps.extend(new_ids);
106+
107+
// remove old snapshots
108+
repo.delete_snapshots(&remove_snaps)?;
109+
110+
// Do the actual locking
111+
lock_files(repo, FileType::Snapshot, &lock_snaps, self.until)?;
112+
113+
Ok(())
114+
}
115+
116+
pub fn lock_packs<P: ProgressBars, S: Open>(
117+
&self,
118+
repo: &Repository<P, S>,
119+
index_files: Vec<(Id, IndexFile)>,
120+
packs: BTreeSet<Id>,
121+
) -> RusticResult<()> {
122+
let mut lock_packs = Vec::new();
123+
let mut remove_index = Vec::new();
124+
125+
// Check for indexfiles-to-modify and for packs to lock
126+
// Also already write the new index from the index files which are modified.
127+
let p = repo.pb.progress_counter("processing index files...");
128+
p.set_length(index_files.len().try_into().unwrap());
129+
let indexer = Indexer::new_unindexed(repo.dbe().clone()).into_shared();
130+
for (id, mut index) in index_files {
131+
let mut modified = false;
132+
for pack in &mut index.packs {
133+
if !packs.contains(&pack.id) {
134+
continue;
135+
}
136+
if !pack.lock.is_locked(self.until) {
137+
pack.lock = LockOption::set_from_until(self.until);
138+
modified = true;
139+
lock_packs.push(pack.id);
140+
} else if self.always_extend_lock {
141+
lock_packs.push(pack.id);
142+
}
143+
}
144+
if modified {
145+
for pack in index.packs {
146+
indexer.write().unwrap().add(pack)?;
147+
}
148+
for pack_remove in index.packs_to_delete {
149+
indexer.write().unwrap().add_remove(pack_remove)?;
150+
}
151+
remove_index.push(id);
152+
}
153+
p.inc(1);
154+
}
155+
indexer.write().unwrap().finalize()?;
156+
p.finish();
157+
158+
// Remove old index files
159+
let p = repo.pb.progress_counter("removing old index files...");
160+
repo.dbe()
161+
.delete_list(FileType::Index, true, remove_index.iter(), p)?;
162+
163+
// Do the actual locking
164+
lock_files(repo, FileType::Pack, &lock_packs, self.until)?;
165+
166+
Ok(())
167+
}
168+
}
169+
170+
fn lock_files<P: ProgressBars, S>(
171+
repo: &Repository<P, S>,
172+
file_type: FileType,
173+
ids: &[Id],
174+
until: Option<DateTime<Local>>,
175+
) -> RusticResult<()> {
176+
let pool = ThreadPoolBuilder::new()
177+
.num_threads(constants::MAX_LOCKER_THREADS_NUM)
178+
.build()
179+
.map_err(RepositoryErrorKind::FromThreadPoolbilderError)?;
180+
let progress_bar_ref = &repo.pb.progress_counter("locking {filetype:?} files..");
181+
let backend = &repo.be;
182+
pool.in_place_scope(|scope| {
183+
for id in ids {
184+
scope.spawn(move |_| {
185+
if let Err(e) = backend.lock(file_type, id, until) {
186+
// FIXME: Use error handling
187+
error!("lock failed for {file_type:?} {id:?}. {e}");
188+
};
189+
progress_bar_ref.inc(1);
190+
});
191+
}
192+
});
193+
Ok(())
194+
}
195+
196+
/// Find packs which are needed for the given Trees
197+
///
198+
/// # Arguments
199+
///
200+
/// * `index` - The index to use
201+
/// * `trees` - The trees to consider
202+
/// * `pb` - The progress bars
203+
///
204+
/// # Errors
205+
///
206+
// TODO!: add errors!
207+
fn find_needed_packs(
208+
be: &impl DecryptReadBackend,
209+
index: &impl ReadGlobalIndex,
210+
trees: Vec<Id>,
211+
pb: &impl ProgressBars,
212+
) -> RusticResult<BTreeSet<Id>> {
213+
let p = pb.progress_counter("finding needed packs...");
214+
215+
let mut packs = BTreeSet::new();
216+
217+
for tree_id in &trees {
218+
_ = packs.insert(
219+
index
220+
.get_id(BlobType::Tree, tree_id)
221+
.ok_or_else(|| CommandErrorKind::IdNotFoundinIndex(*tree_id))?
222+
.pack,
223+
);
224+
}
225+
226+
let mut tree_streamer = TreeStreamerOnce::new(be, index, trees, p)?;
227+
while let Some(item) = tree_streamer.next().transpose()? {
228+
let (_, tree) = item;
229+
for node in tree.nodes {
230+
match node.node_type {
231+
NodeType::File => {
232+
for id in node.content.iter().flatten() {
233+
_ = packs.insert(
234+
index
235+
.get_id(BlobType::Data, id)
236+
.ok_or_else(|| CommandErrorKind::IdNotFoundinIndex(*id))?
237+
.pack,
238+
);
239+
}
240+
}
241+
NodeType::Dir => {
242+
let id = &node.subtree.unwrap();
243+
_ = packs.insert(
244+
index
245+
.get_id(BlobType::Tree, id)
246+
.ok_or_else(|| CommandErrorKind::IdNotFoundinIndex(*id))?
247+
.pack,
248+
);
249+
}
250+
_ => {} // nothing to do
251+
}
252+
}
253+
}
254+
255+
Ok(packs)
256+
}

crates/core/src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,8 @@ pub enum CommandErrorKind {
214214
FromRayonError(#[from] rayon::ThreadPoolBuildError),
215215
/// conversion to `u64` failed: `{0:?}`
216216
ConversionToU64Failed(TryFromIntError),
217+
/// Id {0:?} not found in index
218+
IdNotFoundinIndex(Id),
217219
}
218220

219221
/// [`CryptoErrorKind`] describes the errors that can happen while dealing with Cryptographic functions

crates/core/src/repository.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use std::{
1010
};
1111

1212
use bytes::Bytes;
13+
use chrono::Local;
1314
use derive_setters::Setters;
1415
use log::{debug, error, info};
1516
use serde_with::{serde_as, DisplayFromStr};
@@ -37,6 +38,7 @@ use crate::{
3738
copy::CopySnapshot,
3839
forget::{ForgetGroups, KeepOptions},
3940
key::KeyOptions,
41+
lock::LockOptions,
4042
prune::{PruneOptions, PrunePlan},
4143
repair::{index::RepairIndexOptions, snapshots::RepairSnapshotsOptions},
4244
repoinfo::{IndexInfos, RepoFileInfos},
@@ -1058,6 +1060,22 @@ impl<P: ProgressBars, S: Open> Repository<P, S> {
10581060
opts.get_plan(self)
10591061
}
10601062

1063+
/// Lock snapshot and pack files needed for the given snapshots
1064+
///
1065+
/// # Arguments
1066+
///
1067+
/// * `opts` - The lock options to use
1068+
/// * `snaps` - The snapshots to lock
1069+
/// * `until` - until when to lock. None means lock forever.
1070+
///
1071+
/// # Errors
1072+
///
1073+
// TODO: Document errors
1074+
pub fn lock(&self, opts: &LockOptions, snaps: &[SnapshotFile]) -> RusticResult<()> {
1075+
let now = Local::now();
1076+
opts.lock(self, snaps, now)
1077+
}
1078+
10611079
/// Turn the repository into the `IndexedFull` state by reading and storing the index
10621080
///
10631081
/// # Errors

0 commit comments

Comments
 (0)