Skip to content

Commit 1b9d988

Browse files
committed
feat!(commands): Add possibility for locking
1 parent c504c29 commit 1b9d988

File tree

13 files changed

+459
-23
lines changed

13 files changed

+459
-23
lines changed

crates/core/src/backend.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use std::{io::Read, ops::Deref, path::PathBuf, sync::Arc};
1414

1515
use anyhow::Result;
1616
use bytes::Bytes;
17+
use chrono::{DateTime, Local};
1718
use enum_map::Enum;
1819
use log::trace;
1920

@@ -337,6 +338,26 @@ pub trait WriteBackend: ReadBackend {
337338
///
338339
/// The result of the removal.
339340
fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()>;
341+
342+
/// Specify if the backend is able to lock files
343+
fn can_lock(&self) -> bool {
344+
false
345+
}
346+
347+
/// Lock the given file.
348+
///
349+
/// # Arguments
350+
///
351+
/// * `tpe` - The type of the file.
352+
/// * `id` - The id of the file.
353+
/// * `until` - The date until when to lock. May be `None` which usually specifies a unlimited lock
354+
///
355+
/// # Errors
356+
///
357+
/// If the file could not be read.
358+
fn lock(&self, _tpe: FileType, _id: &Id, _until: Option<DateTime<Local>>) -> Result<()> {
359+
Ok(())
360+
}
340361
}
341362

342363
#[cfg(test)]

crates/core/src/backend/decrypt.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -283,15 +283,18 @@ pub trait DecryptWriteBackend: WriteBackend + Clone + 'static {
283283
&self,
284284
list: I,
285285
p: impl Progress,
286-
) -> RusticResult<()> {
286+
) -> RusticResult<Vec<F::Id>> {
287287
p.set_length(list.len() as u64);
288-
list.par_bridge().try_for_each(|file| -> RusticResult<_> {
289-
_ = self.save_file(file)?;
290-
p.inc(1);
291-
Ok(())
292-
})?;
288+
let ids = list
289+
.par_bridge()
290+
.map(|file| -> RusticResult<F::Id> {
291+
let id = self.save_file(file)?.into();
292+
p.inc(1);
293+
Ok(id)
294+
})
295+
.collect::<RusticResult<_>>()?;
293296
p.finish();
294-
Ok(())
297+
Ok(ids)
295298
}
296299

297300
/// Deletes the given list of files.

crates/core/src/commands.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ pub mod dump;
1212
pub mod forget;
1313
pub mod init;
1414
pub mod key;
15+
pub mod lock;
1516
pub mod merge;
1617
pub mod prune;
1718
/// The `repair` command.

crates/core/src/commands/copy.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use log::trace;
44
use rayon::prelude::{IntoParallelRefIterator, ParallelBridge, ParallelIterator};
55

66
use crate::{
7-
backend::{decrypt::DecryptWriteBackend, node::NodeType},
7+
backend::node::NodeType,
88
blob::{packer::Packer, tree::TreeStreamerOnce, BlobId, BlobType},
99
error::RusticResult,
1010
index::{indexer::Indexer, ReadIndex},
@@ -130,8 +130,7 @@ pub(crate) fn copy<'a, Q, R: IndexedFull, P: ProgressBars, S: IndexedIds>(
130130
_ = tree_packer.finalize()?;
131131
indexer.write().unwrap().finalize()?;
132132

133-
let p = pb.progress_counter("saving snapshots...");
134-
be_dest.save_list(snaps.iter(), p)?;
133+
let _ = repo_dest.save_snapshots(snaps)?;
135134
Ok(())
136135
}
137136

crates/core/src/commands/forget.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -524,10 +524,12 @@ impl KeepOptions {
524524

525525
while let Some(sn) = iter.next() {
526526
let (keep, reasons) = {
527-
if sn.must_keep(now) {
528-
(true, vec!["snapshot"])
527+
if sn.is_locked(now) {
528+
(true, vec!["locked"])
529+
} else if sn.must_keep(now) {
530+
(true, vec!["delete mark"])
529531
} else if sn.must_delete(now) {
530-
(false, vec!["snapshot"])
532+
(false, vec!["delete mark"])
531533
} else {
532534
let reasons =
533535
group_keep.matches(&sn, last.as_ref(), iter.peek().is_some(), latest_time);

crates/core/src/commands/lock.rs

Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
//! `lock` subcommand
2+
use std::collections::BTreeSet;
3+
4+
use chrono::{DateTime, Local};
5+
use derive_setters::Setters;
6+
use log::error;
7+
use rayon::ThreadPoolBuilder;
8+
9+
use crate::{
10+
backend::{
11+
decrypt::{DecryptReadBackend, DecryptWriteBackend},
12+
node::NodeType,
13+
},
14+
blob::{tree::TreeStreamerOnce, BlobType},
15+
error::{CommandErrorKind, RepositoryErrorKind, RusticResult},
16+
index::{
17+
binarysorted::{IndexCollector, IndexType},
18+
indexer::Indexer,
19+
GlobalIndex, ReadGlobalIndex,
20+
},
21+
progress::{Progress, ProgressBars},
22+
repofile::{IndexFile, IndexId, KeyId, PackId, RepoId, SnapshotFile, SnapshotId},
23+
repository::{Open, Repository},
24+
BlobId, TreeId,
25+
};
26+
27+
pub(super) mod constants {
28+
/// The maximum number of reader threads to use for locking.
29+
pub(super) const MAX_LOCKER_THREADS_NUM: usize = 20;
30+
}
31+
32+
#[derive(Debug, Clone, Default, Copy, Setters)]
33+
/// Options for the `lock` command
34+
pub struct LockOptions {
35+
/// Extend locks even if the files are already locked long enough
36+
always_extend_lock: bool,
37+
38+
/// Specify until when to extend the lock. If None, lock forever
39+
until: Option<DateTime<Local>>,
40+
}
41+
42+
impl LockOptions {
43+
/// Lock the given snapshots and corresponding pack files
44+
pub fn lock<P: ProgressBars, S: Open>(
45+
&self,
46+
repo: &Repository<P, S>,
47+
snapshots: &[SnapshotFile],
48+
now: DateTime<Local>,
49+
) -> RusticResult<()> {
50+
let pb = &repo.pb;
51+
let be = repo.dbe();
52+
53+
let mut index_files = Vec::new();
54+
55+
let p = pb.progress_counter("reading index...");
56+
let mut index_collector = IndexCollector::new(IndexType::Full);
57+
for index in be.stream_all::<IndexFile>(&p)? {
58+
let (id, index) = index?;
59+
index_collector.extend(index.packs.clone());
60+
index_files.push((id, index));
61+
}
62+
let index = GlobalIndex::new_from_index(index_collector.into_index());
63+
p.finish();
64+
65+
let snap_tress = snapshots.iter().map(|sn| sn.tree).collect();
66+
let packs = find_needed_packs(be, &index, snap_tress, pb)?;
67+
self.lock_packs(repo, index_files, packs)?;
68+
69+
self.lock_snapshots(repo, snapshots, now)?;
70+
71+
Ok(())
72+
}
73+
74+
fn lock_snapshots<P: ProgressBars, S: Open>(
75+
&self,
76+
repo: &Repository<P, S>,
77+
snapshots: &[SnapshotFile],
78+
now: DateTime<Local>,
79+
) -> RusticResult<()> {
80+
let mut new_snaps = Vec::new();
81+
let mut remove_snaps = Vec::new();
82+
let mut lock_snaps = Vec::new();
83+
84+
for snap in snapshots {
85+
if !snap.delete.is_locked(self.until) {
86+
new_snaps.push(SnapshotFile {
87+
delete: self.until.into(),
88+
..snap.clone()
89+
});
90+
if !snap.must_keep(now) {
91+
remove_snaps.push(snap.id);
92+
}
93+
} else if self.always_extend_lock {
94+
lock_snaps.push(snap.id);
95+
}
96+
}
97+
98+
// save new snapshots
99+
let new_ids = repo.save_snapshots(new_snaps)?;
100+
lock_snaps.extend(new_ids);
101+
102+
// remove old snapshots
103+
repo.delete_snapshots(&remove_snaps)?;
104+
105+
// Do the actual locking
106+
lock_files(repo, &lock_snaps, self.until)?;
107+
108+
Ok(())
109+
}
110+
111+
fn lock_packs<P: ProgressBars, S: Open>(
112+
&self,
113+
repo: &Repository<P, S>,
114+
index_files: Vec<(IndexId, IndexFile)>,
115+
packs: BTreeSet<PackId>,
116+
) -> RusticResult<()> {
117+
let mut lock_packs = Vec::new();
118+
let mut remove_index = Vec::new();
119+
120+
// Check for indexfiles-to-modify and for packs to lock
121+
// Also already write the new index from the index files which are modified.
122+
let p = repo.pb.progress_counter("processing index files...");
123+
p.set_length(index_files.len().try_into().unwrap());
124+
let indexer = Indexer::new_unindexed(repo.dbe().clone()).into_shared();
125+
for (id, mut index) in index_files {
126+
let mut modified = false;
127+
for pack in &mut index.packs {
128+
if !packs.contains(&pack.id) {
129+
continue;
130+
}
131+
if !pack.lock.is_locked(self.until) {
132+
pack.lock = self.until.into();
133+
modified = true;
134+
lock_packs.push(pack.id);
135+
} else if self.always_extend_lock {
136+
lock_packs.push(pack.id);
137+
}
138+
}
139+
if modified {
140+
for pack in index.packs {
141+
indexer.write().unwrap().add(pack)?;
142+
}
143+
for pack_remove in index.packs_to_delete {
144+
indexer.write().unwrap().add_remove(pack_remove)?;
145+
}
146+
remove_index.push(id);
147+
}
148+
p.inc(1);
149+
}
150+
indexer.write().unwrap().finalize()?;
151+
p.finish();
152+
153+
// Remove old index files
154+
let p = repo.pb.progress_counter("removing old index files...");
155+
repo.dbe().delete_list(true, remove_index.iter(), p)?;
156+
157+
// Do the actual locking
158+
lock_files(repo, &lock_packs, self.until)?;
159+
160+
Ok(())
161+
}
162+
}
163+
164+
pub fn lock_repo<P: ProgressBars, S>(
165+
repo: &Repository<P, S>,
166+
until: Option<DateTime<Local>>,
167+
) -> RusticResult<()> {
168+
lock_all_files::<P, S, KeyId>(repo, until)?;
169+
lock_all_files::<P, S, SnapshotId>(repo, until)?;
170+
lock_all_files::<P, S, IndexId>(repo, until)?;
171+
lock_all_files::<P, S, PackId>(repo, until)?;
172+
Ok(())
173+
}
174+
175+
pub fn lock_all_files<P: ProgressBars, S, ID: RepoId>(
176+
repo: &Repository<P, S>,
177+
until: Option<DateTime<Local>>,
178+
) -> RusticResult<()> {
179+
let p = &repo
180+
.pb
181+
.progress_spinner(format!("listing {:?} files..", ID::TYPE));
182+
let ids: Vec<ID> = repo.list()?.collect();
183+
p.finish();
184+
lock_files(repo, &ids, until)
185+
}
186+
187+
fn lock_files<P: ProgressBars, S, ID: RepoId>(
188+
repo: &Repository<P, S>,
189+
ids: &[ID],
190+
until: Option<DateTime<Local>>,
191+
) -> RusticResult<()> {
192+
let pool = ThreadPoolBuilder::new()
193+
.num_threads(constants::MAX_LOCKER_THREADS_NUM)
194+
.build()
195+
.map_err(RepositoryErrorKind::FromThreadPoolbilderError)?;
196+
let p = &repo
197+
.pb
198+
.progress_counter(format!("locking {:?} files..", ID::TYPE));
199+
p.set_length(ids.len().try_into().unwrap());
200+
let backend = &repo.be;
201+
pool.in_place_scope(|scope| {
202+
for id in ids {
203+
scope.spawn(move |_| {
204+
if let Err(e) = backend.lock(ID::TYPE, id, until) {
205+
// FIXME: Use error handling
206+
error!("lock failed for {:?} {id:?}. {e}", ID::TYPE);
207+
};
208+
p.inc(1);
209+
});
210+
}
211+
});
212+
p.finish();
213+
Ok(())
214+
}
215+
216+
/// Find packs which are needed for the given Trees
217+
///
218+
/// # Arguments
219+
///
220+
/// * `index` - The index to use
221+
/// * `trees` - The trees to consider
222+
/// * `pb` - The progress bars
223+
///
224+
/// # Errors
225+
///
226+
// TODO!: add errors!
227+
fn find_needed_packs(
228+
be: &impl DecryptReadBackend,
229+
index: &impl ReadGlobalIndex,
230+
trees: Vec<TreeId>,
231+
pb: &impl ProgressBars,
232+
) -> RusticResult<BTreeSet<PackId>> {
233+
let p = pb.progress_counter("finding needed packs...");
234+
235+
let mut packs = BTreeSet::new();
236+
237+
for tree_id in &trees {
238+
let blob_id = BlobId::from(*tree_id);
239+
_ = packs.insert(
240+
index
241+
.get_id(BlobType::Tree, &blob_id)
242+
.ok_or_else(|| CommandErrorKind::BlobIdNotFoundinIndex(blob_id))?
243+
.pack,
244+
);
245+
}
246+
247+
let mut tree_streamer = TreeStreamerOnce::new(be, index, trees, p)?;
248+
while let Some(item) = tree_streamer.next().transpose()? {
249+
let (_, tree) = item;
250+
for node in tree.nodes {
251+
match node.node_type {
252+
NodeType::File => {
253+
for id in node.content.iter().flatten() {
254+
let blob_id = BlobId::from(*id);
255+
_ = packs.insert(
256+
index
257+
.get_id(BlobType::Data, &blob_id)
258+
.ok_or_else(|| CommandErrorKind::BlobIdNotFoundinIndex(blob_id))?
259+
.pack,
260+
);
261+
}
262+
}
263+
NodeType::Dir => {
264+
let id = &node.subtree.unwrap();
265+
let blob_id = BlobId::from(*id);
266+
_ = packs.insert(
267+
index
268+
.get_id(BlobType::Tree, &blob_id)
269+
.ok_or_else(|| CommandErrorKind::BlobIdNotFoundinIndex(blob_id))?
270+
.pack,
271+
);
272+
}
273+
_ => {} // nothing to do
274+
}
275+
}
276+
}
277+
278+
Ok(packs)
279+
}

0 commit comments

Comments
 (0)