Skip to content

Commit 1f8cafc

Browse files
committed
feat: Add lock_repo command
1 parent e84373e commit 1f8cafc

File tree

11 files changed

+285
-6
lines changed

11 files changed

+285
-6
lines changed

crates/core/src/backend.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ pub(crate) mod dry_run;
66
pub(crate) mod hotcold;
77
pub(crate) mod ignore;
88
pub(crate) mod local_destination;
9+
pub(crate) mod lock;
910
pub(crate) mod node;
1011
pub(crate) mod stdin;
1112
pub(crate) mod warm_up;
@@ -14,8 +15,9 @@ use std::{io::Read, ops::Deref, path::PathBuf, sync::Arc};
1415

1516
use anyhow::Result;
1617
use bytes::Bytes;
18+
use chrono::{DateTime, Local};
1719
use enum_map::Enum;
18-
use log::trace;
20+
use log::{debug, trace};
1921

2022
#[cfg(test)]
2123
use mockall::mock;
@@ -337,6 +339,27 @@ pub trait WriteBackend: ReadBackend {
337339
///
338340
/// The result of the removal.
339341
fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()>;
342+
343+
/// Specify if the backend is able to lock files
344+
fn can_lock(&self) -> bool {
345+
false
346+
}
347+
348+
/// Lock the given file.
349+
///
350+
/// # Arguments
351+
///
352+
/// * `tpe` - The type of the file.
353+
/// * `id` - The id of the file.
354+
/// * `until` - The date until when to lock. May be `None` which usually specifies a unlimited lock
355+
///
356+
/// # Errors
357+
///
358+
/// If the file could not be read.
359+
fn lock(&self, tpe: FileType, id: &Id, until: Option<DateTime<Local>>) -> Result<()> {
360+
debug!("no locking implemented. {tpe:?}, {id}, {until:?}");
361+
Ok(())
362+
}
340363
}
341364

342365
#[cfg(test)]
@@ -374,6 +397,12 @@ impl WriteBackend for Arc<dyn WriteBackend> {
374397
fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()> {
375398
self.deref().remove(tpe, id, cacheable)
376399
}
400+
fn can_lock(&self) -> bool {
401+
self.deref().can_lock()
402+
}
403+
fn lock(&self, tpe: FileType, id: &Id, until: Option<DateTime<Local>>) -> Result<()> {
404+
self.deref().lock(tpe, id, until)
405+
}
377406
}
378407

379408
impl ReadBackend for Arc<dyn WriteBackend> {

crates/core/src/backend/cache.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::{
88

99
use anyhow::Result;
1010
use bytes::Bytes;
11+
use chrono::{DateTime, Local};
1112
use dirs::cache_dir;
1213
use log::{trace, warn};
1314
use walkdir::WalkDir;
@@ -210,6 +211,14 @@ impl WriteBackend for CachedBackend {
210211
}
211212
self.be.remove(tpe, id, cacheable)
212213
}
214+
215+
fn can_lock(&self) -> bool {
216+
self.be.can_lock()
217+
}
218+
219+
fn lock(&self, tpe: FileType, id: &Id, until: Option<DateTime<Local>>) -> Result<()> {
220+
self.be.lock(tpe, id, until)
221+
}
213222
}
214223

215224
/// Backend that caches data in a directory.

crates/core/src/backend/decrypt.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::{num::NonZeroU32, sync::Arc};
22

33
use anyhow::Result;
44
use bytes::Bytes;
5+
use chrono::{DateTime, Local};
56
use crossbeam_channel::{unbounded, Receiver};
67
use rayon::prelude::*;
78
use zstd::stream::{copy_encode, decode_all, encode_all};
@@ -578,6 +579,14 @@ impl<C: CryptoKey> WriteBackend for DecryptBackend<C> {
578579
fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()> {
579580
self.be.remove(tpe, id, cacheable)
580581
}
582+
583+
fn can_lock(&self) -> bool {
584+
self.be.can_lock()
585+
}
586+
587+
fn lock(&self, tpe: FileType, id: &Id, until: Option<DateTime<Local>>) -> Result<()> {
588+
self.be.lock(tpe, id, until)
589+
}
581590
}
582591

583592
#[cfg(test)]

crates/core/src/backend/dry_run.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use anyhow::Result;
22
use bytes::Bytes;
3+
use chrono::{DateTime, Local};
34
use zstd::decode_all;
45

56
use crate::{
@@ -156,4 +157,12 @@ impl<BE: DecryptFullBackend> WriteBackend for DryRunBackend<BE> {
156157
self.be.remove(tpe, id, cacheable)
157158
}
158159
}
160+
161+
fn can_lock(&self) -> bool {
162+
self.be.can_lock()
163+
}
164+
165+
fn lock(&self, tpe: FileType, id: &Id, until: Option<DateTime<Local>>) -> Result<()> {
166+
self.be.lock(tpe, id, until)
167+
}
159168
}

crates/core/src/backend/hotcold.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::sync::Arc;
22

33
use anyhow::Result;
44
use bytes::Bytes;
5+
use chrono::{DateTime, Local};
56

67
use crate::{
78
backend::{FileType, ReadBackend, WriteBackend},
@@ -98,4 +99,12 @@ impl WriteBackend for HotColdBackend {
9899
}
99100
Ok(())
100101
}
102+
103+
fn can_lock(&self) -> bool {
104+
self.be.can_lock()
105+
}
106+
107+
fn lock(&self, tpe: FileType, id: &Id, until: Option<DateTime<Local>>) -> Result<()> {
108+
self.be.lock(tpe, id, until)
109+
}
101110
}

crates/core/src/backend/lock.rs

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
use std::{process::Command, sync::Arc};
2+
3+
use anyhow::Result;
4+
use bytes::Bytes;
5+
use chrono::{DateTime, Local};
6+
use log::{debug, warn};
7+
8+
use crate::{
9+
backend::{FileType, ReadBackend, WriteBackend},
10+
id::Id,
11+
CommandInput,
12+
};
13+
14+
/// A backend which warms up files by simply accessing them.
15+
#[derive(Clone, Debug)]
16+
pub struct LockBackend {
17+
/// The backend to use.
18+
be: Arc<dyn WriteBackend>,
19+
/// The command to be called to lock files in the backend
20+
command: CommandInput,
21+
}
22+
23+
impl LockBackend {
24+
/// Creates a new `WarmUpAccessBackend`.
25+
///
26+
/// # Arguments
27+
///
28+
/// * `be` - The backend to use.
29+
pub fn new_lock(be: Arc<dyn WriteBackend>, command: CommandInput) -> Arc<dyn WriteBackend> {
30+
Arc::new(Self { be, command })
31+
}
32+
}
33+
34+
impl ReadBackend for LockBackend {
35+
fn location(&self) -> String {
36+
self.be.location()
37+
}
38+
39+
fn list_with_size(&self, tpe: FileType) -> Result<Vec<(Id, u32)>> {
40+
self.be.list_with_size(tpe)
41+
}
42+
43+
fn read_full(&self, tpe: FileType, id: &Id) -> Result<Bytes> {
44+
self.be.read_full(tpe, id)
45+
}
46+
47+
fn read_partial(
48+
&self,
49+
tpe: FileType,
50+
id: &Id,
51+
cacheable: bool,
52+
offset: u32,
53+
length: u32,
54+
) -> Result<Bytes> {
55+
self.be.read_partial(tpe, id, cacheable, offset, length)
56+
}
57+
58+
fn list(&self, tpe: FileType) -> Result<Vec<Id>> {
59+
self.be.list(tpe)
60+
}
61+
62+
fn needs_warm_up(&self) -> bool {
63+
self.be.needs_warm_up()
64+
}
65+
66+
fn warm_up(&self, tpe: FileType, id: &Id) -> Result<()> {
67+
self.be.warm_up(tpe, id)
68+
}
69+
}
70+
71+
fn path(tpe: FileType, id: &Id) -> String {
72+
let hex_id = id.to_hex();
73+
match tpe {
74+
FileType::Config => "config".into(),
75+
FileType::Pack => format!("data/{}/{}", &hex_id[0..2], &*hex_id),
76+
_ => format!("{}/{}", tpe.dirname(), &*hex_id),
77+
}
78+
}
79+
80+
impl WriteBackend for LockBackend {
81+
fn create(&self) -> Result<()> {
82+
self.be.create()
83+
}
84+
85+
fn write_bytes(&self, tpe: FileType, id: &Id, cacheable: bool, buf: Bytes) -> Result<()> {
86+
self.be.write_bytes(tpe, id, cacheable, buf)
87+
}
88+
89+
fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()> {
90+
self.be.remove(tpe, id, cacheable)
91+
}
92+
93+
fn can_lock(&self) -> bool {
94+
true
95+
}
96+
97+
fn lock(&self, tpe: FileType, id: &Id, until: Option<DateTime<Local>>) -> Result<()> {
98+
let until = until.map_or_else(|| String::new(), |u| format!("{}", u.to_rfc3339()));
99+
let path = path(tpe, id);
100+
let args = self.command.args().iter().map(|c| {
101+
c.replace("%id", &id.to_hex())
102+
.replace("%type", tpe.dirname())
103+
.replace("%path", &path)
104+
.replace("%until", &until)
105+
});
106+
debug!("calling {:?}...", self.command);
107+
let status = Command::new(self.command.command()).args(args).status()?;
108+
if !status.success() {
109+
warn!("lock command was not successful for {tpe:?}, id: {id}. {status}");
110+
}
111+
Ok(())
112+
}
113+
}

crates/core/src/commands.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ pub mod dump;
1212
pub mod forget;
1313
pub mod init;
1414
pub mod key;
15+
pub mod lock;
1516
pub mod merge;
1617
pub mod prune;
1718
/// The `repair` command.

crates/core/src/commands/lock.rs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
//! `lock` subcommand
2+
3+
use chrono::{DateTime, Local};
4+
use log::error;
5+
use rayon::ThreadPoolBuilder;
6+
7+
use crate::{
8+
error::{CommandErrorKind, RepositoryErrorKind, RusticResult},
9+
progress::{Progress, ProgressBars},
10+
repofile::{configfile::ConfigId, IndexId, KeyId, PackId, RepoId, SnapshotId},
11+
repository::Repository,
12+
WriteBackend,
13+
};
14+
15+
pub(super) mod constants {
16+
/// The maximum number of reader threads to use for locking.
17+
pub(super) const MAX_LOCKER_THREADS_NUM: usize = 20;
18+
}
19+
20+
pub fn lock_repo<P: ProgressBars, S>(
21+
repo: &Repository<P, S>,
22+
until: Option<DateTime<Local>>,
23+
) -> RusticResult<()> {
24+
lock_all_files::<P, S, ConfigId>(repo, until)?;
25+
lock_all_files::<P, S, KeyId>(repo, until)?;
26+
lock_all_files::<P, S, SnapshotId>(repo, until)?;
27+
lock_all_files::<P, S, IndexId>(repo, until)?;
28+
lock_all_files::<P, S, PackId>(repo, until)?;
29+
Ok(())
30+
}
31+
32+
pub fn lock_all_files<P: ProgressBars, S, ID: RepoId + std::fmt::Debug>(
33+
repo: &Repository<P, S>,
34+
until: Option<DateTime<Local>>,
35+
) -> RusticResult<()> {
36+
if !repo.be.can_lock() {
37+
return Err(CommandErrorKind::NoLockingConfigured.into());
38+
}
39+
40+
let p = &repo
41+
.pb
42+
.progress_spinner(format!("listing {:?} files..", ID::TYPE));
43+
let ids: Vec<ID> = repo.list()?.collect();
44+
p.finish();
45+
lock_files(repo, &ids, until)
46+
}
47+
48+
fn lock_files<P: ProgressBars, S, ID: RepoId + std::fmt::Debug>(
49+
repo: &Repository<P, S>,
50+
ids: &[ID],
51+
until: Option<DateTime<Local>>,
52+
) -> RusticResult<()> {
53+
let pool = ThreadPoolBuilder::new()
54+
.num_threads(constants::MAX_LOCKER_THREADS_NUM)
55+
.build()
56+
.map_err(RepositoryErrorKind::FromThreadPoolbilderError)?;
57+
let p = &repo
58+
.pb
59+
.progress_counter(format!("locking {:?} files..", ID::TYPE));
60+
p.set_length(ids.len().try_into().unwrap());
61+
let backend = &repo.be;
62+
pool.in_place_scope(|scope| {
63+
for id in ids {
64+
scope.spawn(move |_| {
65+
if let Err(e) = backend.lock(ID::TYPE, id, until) {
66+
// FIXME: Use error handling
67+
error!("lock failed for {:?} {id:?}. {e}", ID::TYPE);
68+
};
69+
p.inc(1);
70+
});
71+
}
72+
});
73+
p.finish();
74+
Ok(())
75+
}

crates/core/src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,8 @@ pub enum CommandErrorKind {
229229
NoKeepOption,
230230
/// {0:?}
231231
FromParseError(#[from] shell_words::ParseError),
232+
/// No locking capability configured for the backend
233+
NoLockingConfigured,
232234
}
233235

234236
/// [`CryptoErrorKind`] describes the errors that can happen while dealing with Cryptographic functions

0 commit comments

Comments
 (0)