Skip to content

Commit 1a526bd

Browse files
committed
feat: Add lock_repo command
1 parent 213665e commit 1a526bd

File tree

10 files changed

+303
-7
lines changed

10 files changed

+303
-7
lines changed

crates/core/src/backend.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,17 @@ pub(crate) mod dry_run;
66
pub(crate) mod hotcold;
77
pub(crate) mod ignore;
88
pub(crate) mod local_destination;
9+
pub(crate) mod lock;
910
pub(crate) mod node;
1011
pub(crate) mod stdin;
1112
pub(crate) mod warm_up;
1213

1314
use std::{io::Read, ops::Deref, path::PathBuf, sync::Arc};
1415

1516
use bytes::Bytes;
17+
use chrono::{DateTime, Local};
1618
use enum_map::Enum;
17-
use log::trace;
19+
use log::{debug, trace};
1820

1921
#[cfg(test)]
2022
use mockall::mock;
@@ -337,6 +339,27 @@ pub trait WriteBackend: ReadBackend {
337339
///
338340
/// The result of the removal.
339341
fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> RusticResult<()>;
342+
343+
/// Specify if the backend is able to lock files
344+
fn can_lock(&self) -> bool {
345+
false
346+
}
347+
348+
/// Lock the given file.
349+
///
350+
/// # Arguments
351+
///
352+
/// * `tpe` - The type of the file.
353+
/// * `id` - The id of the file.
354+
/// * `until` - The date until when to lock. May be `None` which usually specifies a unlimited lock
355+
///
356+
/// # Errors
357+
///
358+
/// If the file could not be read.
359+
fn lock(&self, tpe: FileType, id: &Id, until: Option<DateTime<Local>>) -> RusticResult<()> {
360+
debug!("no locking implemented. {tpe:?}, {id}, {until:?}");
361+
Ok(())
362+
}
340363
}
341364

342365
#[cfg(test)]
@@ -374,6 +397,12 @@ impl WriteBackend for Arc<dyn WriteBackend> {
374397
fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> RusticResult<()> {
375398
self.deref().remove(tpe, id, cacheable)
376399
}
400+
fn can_lock(&self) -> bool {
401+
self.deref().can_lock()
402+
}
403+
fn lock(&self, tpe: FileType, id: &Id, until: Option<DateTime<Local>>) -> RusticResult<()> {
404+
self.deref().lock(tpe, id, until)
405+
}
377406
}
378407

379408
impl ReadBackend for Arc<dyn WriteBackend> {

crates/core/src/backend/cache.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::{
77
};
88

99
use bytes::Bytes;
10+
use chrono::{DateTime, Local};
1011
use dirs::cache_dir;
1112
use log::{trace, warn};
1213
use walkdir::WalkDir;
@@ -226,6 +227,14 @@ impl WriteBackend for CachedBackend {
226227
}
227228
self.be.remove(tpe, id, cacheable)
228229
}
230+
231+
fn can_lock(&self) -> bool {
232+
self.be.can_lock()
233+
}
234+
235+
fn lock(&self, tpe: FileType, id: &Id, until: Option<DateTime<Local>>) -> RusticResult<()> {
236+
self.be.lock(tpe, id, until)
237+
}
229238
}
230239

231240
/// Backend that caches data in a directory.

crates/core/src/backend/decrypt.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::{num::NonZeroU32, sync::Arc};
22

33
use bytes::Bytes;
4+
use chrono::{DateTime, Local};
45
use crossbeam_channel::{Receiver, unbounded};
56
use rayon::prelude::*;
67
use zstd::stream::{copy_encode, decode_all, encode_all};
@@ -636,6 +637,14 @@ impl<C: CryptoKey> WriteBackend for DecryptBackend<C> {
636637
fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> RusticResult<()> {
637638
self.be.remove(tpe, id, cacheable)
638639
}
640+
641+
fn can_lock(&self) -> bool {
642+
self.be.can_lock()
643+
}
644+
645+
fn lock(&self, tpe: FileType, id: &Id, until: Option<DateTime<Local>>) -> RusticResult<()> {
646+
self.be.lock(tpe, id, until)
647+
}
639648
}
640649

641650
#[cfg(test)]

crates/core/src/backend/dry_run.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use bytes::Bytes;
2+
use chrono::{DateTime, Local};
23
use zstd::decode_all;
34

45
use crate::{
@@ -164,4 +165,12 @@ impl<BE: DecryptFullBackend> WriteBackend for DryRunBackend<BE> {
164165
self.be.remove(tpe, id, cacheable)
165166
}
166167
}
168+
169+
fn can_lock(&self) -> bool {
170+
self.be.can_lock()
171+
}
172+
173+
fn lock(&self, tpe: FileType, id: &Id, until: Option<DateTime<Local>>) -> RusticResult<()> {
174+
self.be.lock(tpe, id, until)
175+
}
167176
}

crates/core/src/backend/hotcold.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::sync::Arc;
22

33
use bytes::Bytes;
4+
use chrono::{DateTime, Local};
45

56
use crate::{
67
backend::{FileType, ReadBackend, WriteBackend},
@@ -98,4 +99,12 @@ impl WriteBackend for HotColdBackend {
9899
}
99100
Ok(())
100101
}
102+
103+
fn can_lock(&self) -> bool {
104+
self.be.can_lock()
105+
}
106+
107+
fn lock(&self, tpe: FileType, id: &Id, until: Option<DateTime<Local>>) -> RusticResult<()> {
108+
self.be.lock(tpe, id, until)
109+
}
101110
}

crates/core/src/backend/lock.rs

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
use std::{process::Command, sync::Arc};
2+
3+
use bytes::Bytes;
4+
use chrono::{DateTime, Local};
5+
use log::{debug, warn};
6+
7+
use crate::{
8+
CommandInput, ErrorKind, RusticError, RusticResult,
9+
backend::{FileType, ReadBackend, WriteBackend},
10+
id::Id,
11+
};
12+
13+
/// A backend which warms up files by simply accessing them.
14+
#[derive(Clone, Debug)]
15+
pub struct LockBackend {
16+
/// The backend to use.
17+
be: Arc<dyn WriteBackend>,
18+
/// The command to be called to lock files in the backend
19+
command: CommandInput,
20+
}
21+
22+
impl LockBackend {
23+
/// Creates a new `WarmUpAccessBackend`.
24+
///
25+
/// # Arguments
26+
///
27+
/// * `be` - The backend to use.
28+
pub fn new_lock(be: Arc<dyn WriteBackend>, command: CommandInput) -> Arc<dyn WriteBackend> {
29+
Arc::new(Self { be, command })
30+
}
31+
}
32+
33+
impl ReadBackend for LockBackend {
34+
fn location(&self) -> String {
35+
self.be.location()
36+
}
37+
38+
fn list_with_size(&self, tpe: FileType) -> RusticResult<Vec<(Id, u32)>> {
39+
self.be.list_with_size(tpe)
40+
}
41+
42+
fn read_full(&self, tpe: FileType, id: &Id) -> RusticResult<Bytes> {
43+
self.be.read_full(tpe, id)
44+
}
45+
46+
fn read_partial(
47+
&self,
48+
tpe: FileType,
49+
id: &Id,
50+
cacheable: bool,
51+
offset: u32,
52+
length: u32,
53+
) -> RusticResult<Bytes> {
54+
self.be.read_partial(tpe, id, cacheable, offset, length)
55+
}
56+
57+
fn list(&self, tpe: FileType) -> RusticResult<Vec<Id>> {
58+
self.be.list(tpe)
59+
}
60+
61+
fn needs_warm_up(&self) -> bool {
62+
self.be.needs_warm_up()
63+
}
64+
65+
fn warm_up(&self, tpe: FileType, id: &Id) -> RusticResult<()> {
66+
self.be.warm_up(tpe, id)
67+
}
68+
}
69+
70+
fn path(tpe: FileType, id: &Id) -> String {
71+
let hex_id = id.to_hex();
72+
match tpe {
73+
FileType::Config => "config".into(),
74+
FileType::Pack => format!("data/{}/{}", &hex_id[0..2], &*hex_id),
75+
_ => format!("{}/{}", tpe.dirname(), &*hex_id),
76+
}
77+
}
78+
79+
impl WriteBackend for LockBackend {
80+
fn create(&self) -> RusticResult<()> {
81+
self.be.create()
82+
}
83+
84+
fn write_bytes(&self, tpe: FileType, id: &Id, cacheable: bool, buf: Bytes) -> RusticResult<()> {
85+
self.be.write_bytes(tpe, id, cacheable, buf)
86+
}
87+
88+
fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> RusticResult<()> {
89+
self.be.remove(tpe, id, cacheable)
90+
}
91+
92+
fn can_lock(&self) -> bool {
93+
true
94+
}
95+
96+
fn lock(&self, tpe: FileType, id: &Id, until: Option<DateTime<Local>>) -> RusticResult<()> {
97+
let until = until.map_or_else(String::new, |u| u.to_rfc3339());
98+
let path = path(tpe, id);
99+
let args = self.command.args().iter().map(|c| {
100+
c.replace("%id", &id.to_hex())
101+
.replace("%type", tpe.dirname())
102+
.replace("%path", &path)
103+
.replace("%until", &until)
104+
});
105+
debug!("calling {:?}...", self.command);
106+
let status = Command::new(self.command.command())
107+
.args(args)
108+
.status()
109+
.map_err(|err| {
110+
RusticError::with_source(
111+
ErrorKind::Internal,
112+
"error calling lock command for {tpe}, id: {id}.",
113+
err,
114+
)
115+
.attach_context("tpe", tpe.to_string())
116+
.attach_context("id", id.to_string())
117+
})?;
118+
if !status.success() {
119+
warn!("lock command was not successful for {tpe:?}, id: {id}. {status}");
120+
}
121+
Ok(())
122+
}
123+
}

crates/core/src/commands.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ pub mod dump;
1212
pub mod forget;
1313
pub mod init;
1414
pub mod key;
15+
pub mod lock;
1516
pub mod merge;
1617
pub mod prune;
1718
/// The `repair` command.

crates/core/src/commands/lock.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
//! `lock` subcommand
2+
3+
use chrono::{DateTime, Local};
4+
use log::error;
5+
use rayon::ThreadPoolBuilder;
6+
7+
use crate::{
8+
ErrorKind, RusticError, WriteBackend,
9+
error::RusticResult,
10+
progress::{Progress, ProgressBars},
11+
repofile::{IndexId, KeyId, PackId, RepoId, SnapshotId, configfile::ConfigId},
12+
repository::Repository,
13+
};
14+
15+
pub(super) mod constants {
16+
/// The maximum number of reader threads to use for locking.
17+
pub(super) const MAX_LOCKER_THREADS_NUM: usize = 20;
18+
}
19+
20+
pub fn lock_repo<P: ProgressBars, S>(
21+
repo: &Repository<P, S>,
22+
until: Option<DateTime<Local>>,
23+
) -> RusticResult<()> {
24+
lock_all_files::<P, S, ConfigId>(repo, until)?;
25+
lock_all_files::<P, S, KeyId>(repo, until)?;
26+
lock_all_files::<P, S, SnapshotId>(repo, until)?;
27+
lock_all_files::<P, S, IndexId>(repo, until)?;
28+
lock_all_files::<P, S, PackId>(repo, until)?;
29+
Ok(())
30+
}
31+
32+
pub fn lock_all_files<P: ProgressBars, S, ID: RepoId + std::fmt::Debug>(
33+
repo: &Repository<P, S>,
34+
until: Option<DateTime<Local>>,
35+
) -> RusticResult<()> {
36+
if !repo.be.can_lock() {
37+
return Err(RusticError::new(
38+
ErrorKind::Internal,
39+
"Tried to call lock_all_files on a backend which isn't able to lock.",
40+
));
41+
}
42+
43+
let p = &repo
44+
.pb
45+
.progress_spinner(format!("listing {:?} files..", ID::TYPE));
46+
let ids: Vec<ID> = repo.list()?.collect();
47+
p.finish();
48+
lock_files(repo, &ids, until)
49+
}
50+
51+
fn lock_files<P: ProgressBars, S, ID: RepoId + std::fmt::Debug>(
52+
repo: &Repository<P, S>,
53+
ids: &[ID],
54+
until: Option<DateTime<Local>>,
55+
) -> RusticResult<()> {
56+
let pool = ThreadPoolBuilder::new()
57+
.num_threads(constants::MAX_LOCKER_THREADS_NUM)
58+
.build()
59+
.map_err(|err| {
60+
RusticError::with_source(
61+
ErrorKind::Internal,
62+
"Failed to create thread pool for warm-up. Please try again.",
63+
err,
64+
)
65+
})?;
66+
let p = &repo
67+
.pb
68+
.progress_counter(format!("locking {:?} files..", ID::TYPE));
69+
p.set_length(ids.len().try_into().unwrap());
70+
let backend = &repo.be;
71+
pool.in_place_scope(|scope| {
72+
for id in ids {
73+
scope.spawn(move |_| {
74+
if let Err(e) = backend.lock(ID::TYPE, id, until) {
75+
// FIXME: Use error handling
76+
error!("lock failed for {:?} {id:?}. {e}", ID::TYPE);
77+
}
78+
p.inc(1);
79+
});
80+
}
81+
});
82+
p.finish();
83+
Ok(())
84+
}

0 commit comments

Comments
 (0)