Skip to content

Commit

Permalink
Add async support for Tokio runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
ventaquil committed May 3, 2024
1 parent 9df4a46 commit a9e1db8
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 2 deletions.
8 changes: 8 additions & 0 deletions .cargo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ Alternatively, you can use the [`cargo add`](https://doc.rust-lang.org/cargo/com
cargo add chksum-core
```

## Features

### Asynchronous Runtime

* `async-runtime-tokio`: Enables async interface for Tokio runtime.

By default, neither of these features is enabled.

## Example Crates

For implementation-specific examples, refer to the source code of the following crates:
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Added async support for Tokio runtime.

### Fixed

- Added missing method comments to improve documentation clarity.
Expand Down
9 changes: 9 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,14 @@ all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[dependencies]
async-trait = { version = "0.1.80", optional = true }
chksum-hash-core = "0.0.0"
futures-lite = { version = "2.2.0", optional = true }
thiserror = "1.0.51"
tokio = { version = "1.37.0", features = ["fs", "io-util", "io-std"], optional = true }

[features]
default = []

# async runtimes
async-runtime-tokio = ["async-trait", "tokio"]
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ Alternatively, you can use the [`cargo add`](https://doc.rust-lang.org/cargo/com
cargo add chksum-core
```

## Features

### Asynchronous Runtime

* `async-runtime-tokio`: Enables async interface for Tokio runtime.

By default, neither of these features is enabled.

## Example Crates

For implementation-specific examples, refer to the source code of the following crates:
Expand Down
60 changes: 58 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@
//! cargo add chksum-core
//! ```
//!
//! # Features
//!
//! ## Asynchronous Runtime
//!
//! * `async-runtime-tokio`: Enables async interface for Tokio runtime.
//!
//! By default, neither of these features is enabled.
//!
//! # Example Crates
//!
//! For implementation-specific examples, refer to the source code of the following crates:
Expand All @@ -34,12 +42,16 @@
#![forbid(unsafe_code)]

mod error;
#[cfg(feature = "async-runtime-tokio")]
mod tokio;

use std::fmt::{Display, LowerHex, UpperHex};
use std::fs::{read_dir, DirEntry, File, ReadDir};
use std::io::{self, BufRead, BufReader, IsTerminal, Stdin, StdinLock};
use std::path::{Path, PathBuf};

#[cfg(feature = "async-runtime-tokio")]
use async_trait::async_trait;
#[doc(no_inline)]
pub use chksum_hash_core as hash;

Expand Down Expand Up @@ -70,6 +82,15 @@ where
data.chksum::<T>()
}

/// Computes the hash of the given input.
#[cfg(feature = "async-runtime-tokio")]
pub async fn async_chksum<T>(mut data: impl AsyncChksumable) -> Result<T::Digest>
where
T: Hash + Send,
{
data.chksum::<T>().await
}

/// A trait for hash digests.
pub trait Digest: Display {
/// Returns a byte slice of the digest's contents.
Expand Down Expand Up @@ -235,7 +256,7 @@ impl_chksumable!(PathBuf, &PathBuf, &mut PathBuf => {
where
H: Hash,
{
self.as_path().chksum_with(hash)
Chksumable::chksum_with(&mut self.as_path(), hash)
}
});

Expand Down Expand Up @@ -267,7 +288,7 @@ impl_chksumable!(DirEntry, &DirEntry, &mut DirEntry => {
where
H: Hash,
{
self.path().chksum_with(hash)
Chksumable::chksum_with(&mut self.path(), hash)
}
});

Expand Down Expand Up @@ -316,3 +337,38 @@ impl_chksumable!(StdinLock<'_>, &mut StdinLock<'_> => {
Ok(())
}
});

/// A trait for complex objects which must be processed chunk by chunk.
#[cfg(feature = "async-runtime-tokio")]
#[async_trait]
pub trait AsyncChksumable: Send {
/// Calculates the checksum of the object.
async fn chksum<H>(&mut self) -> Result<H::Digest>
where
H: Hash + Send,
{
let mut hash = H::default();
self.chksum_with(&mut hash).await?;
Ok(hash.digest())
}

/// Updates the given hash instance with the data from the object.
async fn chksum_with<H>(&mut self, hash: &mut H) -> Result<()>
where
H: Hash + Send;
}

#[cfg(feature = "async-runtime-tokio")]
#[async_trait]
impl<T> AsyncChksumable for T
where
T: Hashable + Send,
{
async fn chksum_with<H>(&mut self, hash: &mut H) -> Result<()>
where
H: Hash + Send,
{
self.hash_with(hash);
Ok(())
}
}
117 changes: 117 additions & 0 deletions src/tokio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
use std::path::{Path, PathBuf};

use async_trait::async_trait;
use tokio::fs::{metadata, read_dir, DirEntry, File, ReadDir};
use tokio::io::{AsyncBufReadExt as _, BufReader, Stdin};

use crate::{AsyncChksumable, Hash, Hashable, Result};

macro_rules! impl_async_chksumable {
($($t:ty),+ => $i:tt) => {
$(
#[async_trait]
impl AsyncChksumable for $t $i
)*
};
}

impl_async_chksumable!(Path, &Path, &mut Path => {
async fn chksum_with<H>(&mut self, hash: &mut H) -> Result<()>
where
H: Hash + Send,
{
let metadata = metadata(&self).await?;
if metadata.is_dir() {
read_dir(self).await?.chksum_with(hash).await
} else {
// everything treat as a file when it is not a directory
File::open(self).await?.chksum_with(hash).await
}
}

});

impl_async_chksumable!(PathBuf, &PathBuf, &mut PathBuf => {
async fn chksum_with<H>(&mut self, hash: &mut H) -> Result<()>
where
H: Hash + Send,
{
self.as_path().chksum_with(hash).await
}
});

// TODO: missing `&File` implementation
impl_async_chksumable!(File, &mut File => {
async fn chksum_with<H>(&mut self, hash: &mut H) -> Result<()>
where
H: Hash + Send,
{
// TODO: tracking issue [tokio-rs/tokio#6407](github.com/tokio-rs/tokio/issues/6407)
// if self.is_terminal() {
// return Err(Error::IsTerminal);
// }

let mut reader = BufReader::new(self);
loop {
let buffer = reader.fill_buf().await?;
let length = buffer.len();
if length == 0 {
break;
}
buffer.hash_with(hash);
reader.consume(length);
}
Ok(())
}
});

impl_async_chksumable!(DirEntry, &DirEntry, &mut DirEntry => {
async fn chksum_with<H>(&mut self, hash: &mut H) -> Result<()>
where
H: Hash + Send,
{
self.path().chksum_with(hash).await
}
});

impl_async_chksumable!(ReadDir, &mut ReadDir => {
async fn chksum_with<H>(&mut self, hash: &mut H) -> Result<()>
where
H: Hash + Send,
{
let mut dir_entries = Vec::new();
while let Some(dir_entry) = self.next_entry().await? {
dir_entries.push(dir_entry);
}
dir_entries.sort_by_key(DirEntry::path);
for mut dir_entry in dir_entries {
dir_entry.chksum_with(hash).await?;
}
Ok(())
}
});

// TODO: missing `&Stdin` implementation
impl_async_chksumable!(Stdin, &mut Stdin => {
async fn chksum_with<H>(&mut self, hash: &mut H) -> Result<()>
where
H: Hash + Send,
{
// TODO: tracking issue [tokio-rs/tokio#6407](github.com/tokio-rs/tokio/issues/6407)
// if self.is_terminal() {
// return Err(Error::IsTerminal);
// }

let mut reader = BufReader::new(self);
loop {
let buffer = reader.fill_buf().await?;
let length = buffer.len();
if length == 0 {
break;
}
buffer.hash_with(hash);
reader.consume(length);
}
Ok(())
}
});

0 comments on commit a9e1db8

Please sign in to comment.