Skip to content

Commit

Permalink
Merge branch 'various-fixes'
Browse files Browse the repository at this point in the history
  • Loading branch information
Byron committed Mar 13, 2023
2 parents 531dd19 + a22621d commit cc0f506
Show file tree
Hide file tree
Showing 20 changed files with 377 additions and 266 deletions.
372 changes: 170 additions & 202 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ unit-tests: ## run all unit tests
&& cargo test --features blocking-network-client \
&& cargo test --features regex
cd gitoxide-core && cargo test --lib
# cd gix && cargo test --features async-network-client-async-std \ not being run as it's flaky even locally

nextest: ## run tests with `cargo nextest` (all unit-tests, no doc-tests, faster)
cargo nextest run --all
Expand Down
4 changes: 3 additions & 1 deletion crate-status.md
Original file line number Diff line number Diff line change
Expand Up @@ -638,11 +638,13 @@ See its [README.md](https://github.com/Byron/gitoxide/blob/main/gix-lock/README.
- [ ] respect `branch.<name>.merge` in the returned remote.
* **remotes**
* [x] clone
* [ ] shallow
* [x] shallow
* [ ] include-tags when shallow is used (needs separate fetch)
* [ ] prune non-existing shallow commits
* [ ] [bundles](https://git-scm.com/docs/git-bundle)
* [x] fetch
* [x] shallow (remains shallow, options to adjust shallow boundary)
* [ ] a way to auto-explode small packs to avoid them to pile up
* [ ] 'ref-in-want'
* [ ] standard negotiation algorithms (right now we only have a 'naive' one)
* [ ] push
Expand Down
10 changes: 7 additions & 3 deletions gix-packetline/src/read/async_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use bstr::ByteSlice;
use futures_io::AsyncRead;
use futures_lite::AsyncReadExt;

use crate::read::ProgressAction;
use crate::{
decode,
read::{ExhaustiveOutcome, WithSidebands},
Expand Down Expand Up @@ -150,7 +151,8 @@ where
/// Same as [`as_read_with_sidebands(…)`][StreamingPeekableIter::as_read_with_sidebands()], but for channels without side band support.
///
/// Due to the preconfigured function type this method can be called without 'turbofish'.
pub fn as_read(&mut self) -> WithSidebands<'_, T, fn(bool, &[u8])> {
#[allow(clippy::type_complexity)]
pub fn as_read(&mut self) -> WithSidebands<'_, T, fn(bool, &[u8]) -> ProgressAction> {
WithSidebands::new(self)
}

Expand All @@ -161,7 +163,7 @@ where
/// being true in case the `text` is to be interpreted as error.
///
/// _Please note_ that side bands need to be negotiated with the server.
pub fn as_read_with_sidebands<F: FnMut(bool, &[u8]) + Unpin>(
pub fn as_read_with_sidebands<F: FnMut(bool, &[u8]) -> ProgressAction + Unpin>(
&mut self,
handle_progress: F,
) -> WithSidebands<'_, T, F> {
Expand All @@ -172,7 +174,9 @@ where
///
/// The type parameter `F` needs to be configured for this method to be callable using the 'turbofish' operator.
/// Use [`as_read()`][StreamingPeekableIter::as_read()].
pub fn as_read_without_sidebands<F: FnMut(bool, &[u8]) + Unpin>(&mut self) -> WithSidebands<'_, T, F> {
pub fn as_read_without_sidebands<F: FnMut(bool, &[u8]) -> ProgressAction + Unpin>(
&mut self,
) -> WithSidebands<'_, T, F> {
WithSidebands::without_progress_handler(self)
}
}
11 changes: 8 additions & 3 deletions gix-packetline/src/read/blocking_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::io;

use bstr::ByteSlice;

use crate::read::ProgressAction;
use crate::{
decode,
read::{ExhaustiveOutcome, WithSidebands},
Expand Down Expand Up @@ -146,22 +147,26 @@ where
/// being true in case the `text` is to be interpreted as error.
///
/// _Please note_ that side bands need to be negotiated with the server.
pub fn as_read_with_sidebands<F: FnMut(bool, &[u8])>(&mut self, handle_progress: F) -> WithSidebands<'_, T, F> {
pub fn as_read_with_sidebands<F: FnMut(bool, &[u8]) -> ProgressAction>(
&mut self,
handle_progress: F,
) -> WithSidebands<'_, T, F> {
WithSidebands::with_progress_handler(self, handle_progress)
}

/// Same as [`as_read_with_sidebands(…)`][StreamingPeekableIter::as_read_with_sidebands()], but for channels without side band support.
///
/// The type parameter `F` needs to be configured for this method to be callable using the 'turbofish' operator.
/// Use [`as_read()`][StreamingPeekableIter::as_read()].
pub fn as_read_without_sidebands<F: FnMut(bool, &[u8])>(&mut self) -> WithSidebands<'_, T, F> {
pub fn as_read_without_sidebands<F: FnMut(bool, &[u8]) -> ProgressAction>(&mut self) -> WithSidebands<'_, T, F> {
WithSidebands::without_progress_handler(self)
}

/// Same as [`as_read_with_sidebands(…)`][StreamingPeekableIter::as_read_with_sidebands()], but for channels without side band support.
///
/// Due to the preconfigured function type this method can be called without 'turbofish'.
pub fn as_read(&mut self) -> WithSidebands<'_, T, fn(bool, &[u8])> {
#[allow(clippy::type_complexity)]
pub fn as_read(&mut self) -> WithSidebands<'_, T, fn(bool, &[u8]) -> ProgressAction> {
WithSidebands::new(self)
}
}
9 changes: 9 additions & 0 deletions gix-packetline/src/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@
use crate::MAX_LINE_LEN;
use crate::{PacketLineRef, StreamingPeekableIter, U16_HEX_BYTES};

/// Allow the read-progress handler to determine how to continue.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum ProgressAction {
/// Continue reading the next progress if available.
Continue,
/// Abort all IO even if more would be available, claiming the operation was interrupted.
Interrupt,
}

#[cfg(any(feature = "blocking-io", feature = "async-io"))]
type ExhaustiveOutcome<'a> = (
bool, // is_done
Expand Down
33 changes: 25 additions & 8 deletions gix-packetline/src/read/sidebands/async_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
use futures_io::{AsyncBufRead, AsyncRead};
use futures_lite::ready;

use crate::read::ProgressAction;
use crate::{decode, BandRef, PacketLineRef, StreamingPeekableIter, TextRef, U16_HEX_BYTES};

type ReadLineResult<'a> = Option<std::io::Result<Result<PacketLineRef<'a>, decode::Error>>>;
Expand Down Expand Up @@ -37,7 +38,7 @@ where
}
}

impl<'a, T> WithSidebands<'a, T, fn(bool, &[u8])>
impl<'a, T> WithSidebands<'a, T, fn(bool, &[u8]) -> ProgressAction>
where
T: AsyncRead,
{
Expand Down Expand Up @@ -93,7 +94,7 @@ mod tests {
impl<'a, T, F> WithSidebands<'a, T, F>
where
T: AsyncRead + Unpin,
F: FnMut(bool, &[u8]) + Unpin,
F: FnMut(bool, &[u8]) -> ProgressAction + Unpin,
{
/// Create a new instance with the given `parent` provider and the `handle_progress` function.
///
Expand Down Expand Up @@ -201,7 +202,7 @@ pub struct ReadDataLineFuture<'a, 'b, T: AsyncRead, F> {
impl<'a, 'b, T, F> Future for ReadDataLineFuture<'a, 'b, T, F>
where
T: AsyncRead + Unpin,
F: FnMut(bool, &[u8]) + Unpin,
F: FnMut(bool, &[u8]) -> ProgressAction + Unpin,
{
type Output = std::io::Result<usize>;

Expand All @@ -228,7 +229,7 @@ pub struct ReadLineFuture<'a, 'b, T: AsyncRead, F> {
impl<'a, 'b, T, F> Future for ReadLineFuture<'a, 'b, T, F>
where
T: AsyncRead + Unpin,
F: FnMut(bool, &[u8]) + Unpin,
F: FnMut(bool, &[u8]) -> ProgressAction + Unpin,
{
type Output = std::io::Result<usize>;

Expand All @@ -251,7 +252,7 @@ where
impl<'a, T, F> AsyncBufRead for WithSidebands<'a, T, F>
where
T: AsyncRead + Unpin,
F: FnMut(bool, &[u8]) + Unpin,
F: FnMut(bool, &[u8]) -> ProgressAction + Unpin,
{
fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
use std::io;
Expand Down Expand Up @@ -310,11 +311,27 @@ where
}
BandRef::Progress(d) => {
let text = TextRef::from(d).0;
handle_progress(false, text);
match handle_progress(false, text) {
ProgressAction::Continue => {}
ProgressAction::Interrupt => {
return Poll::Ready(Err(io::Error::new(
std::io::ErrorKind::Other,
"interrupted by user",
)))
}
};
}
BandRef::Error(d) => {
let text = TextRef::from(d).0;
handle_progress(true, text);
match handle_progress(true, text) {
ProgressAction::Continue => {}
ProgressAction::Interrupt => {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
"interrupted by user",
)))
}
};
}
};
}
Expand Down Expand Up @@ -353,7 +370,7 @@ where
impl<'a, T, F> AsyncRead for WithSidebands<'a, T, F>
where
T: AsyncRead + Unpin,
F: FnMut(bool, &[u8]) + Unpin,
F: FnMut(bool, &[u8]) -> ProgressAction + Unpin,
{
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
let nread = {
Expand Down
29 changes: 23 additions & 6 deletions gix-packetline/src/read/sidebands/blocking_io.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{io, io::BufRead};

use crate::read::ProgressAction;
use crate::{BandRef, PacketLineRef, StreamingPeekableIter, TextRef, U16_HEX_BYTES};

/// An implementor of [`BufRead`][io::BufRead] yielding packet lines on each call to [`read_line()`][io::BufRead::read_line()].
Expand All @@ -24,7 +25,7 @@ where
}
}

impl<'a, T> WithSidebands<'a, T, fn(bool, &[u8])>
impl<'a, T> WithSidebands<'a, T, fn(bool, &[u8]) -> ProgressAction>
where
T: io::Read,
{
Expand All @@ -42,7 +43,7 @@ where
impl<'a, T, F> WithSidebands<'a, T, F>
where
T: io::Read,
F: FnMut(bool, &[u8]),
F: FnMut(bool, &[u8]) -> ProgressAction,
{
/// Create a new instance with the given `parent` provider and the `handle_progress` function.
///
Expand Down Expand Up @@ -130,7 +131,7 @@ where
impl<'a, T, F> BufRead for WithSidebands<'a, T, F>
where
T: io::Read,
F: FnMut(bool, &[u8]),
F: FnMut(bool, &[u8]) -> ProgressAction,
{
fn fill_buf(&mut self) -> io::Result<&[u8]> {
if self.pos >= self.cap {
Expand All @@ -154,11 +155,27 @@ where
}
BandRef::Progress(d) => {
let text = TextRef::from(d).0;
handle_progress(false, text);
match handle_progress(false, text) {
ProgressAction::Continue => {}
ProgressAction::Interrupt => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"interrupted by user",
))
}
};
}
BandRef::Error(d) => {
let text = TextRef::from(d).0;
handle_progress(true, text);
match handle_progress(true, text) {
ProgressAction::Continue => {}
ProgressAction::Interrupt => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"interrupted by user",
))
}
};
}
};
}
Expand Down Expand Up @@ -189,7 +206,7 @@ where
impl<'a, T, F> io::Read for WithSidebands<'a, T, F>
where
T: io::Read,
F: FnMut(bool, &[u8]),
F: FnMut(bool, &[u8]) -> ProgressAction,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let nread = {
Expand Down
8 changes: 5 additions & 3 deletions gix-packetline/tests/read/sideband.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use bstr::{BString, ByteSlice};
#[cfg(all(not(feature = "blocking-io"), feature = "async-io"))]
use futures_lite::io::AsyncReadExt;
use gix_odb::pack;
use gix_packetline::read::ProgressAction;
use gix_packetline::PacketLineRef;

use crate::read::streaming_peek_iter::fixture_bytes;
Expand Down Expand Up @@ -51,9 +52,10 @@ async fn read_pack_with_progress_extraction() -> crate::Result {
b"NAK".as_bstr()
);
let mut seen_texts = Vec::<BString>::new();
let mut do_nothing = |is_err: bool, data: &[u8]| {
let mut do_nothing = |is_err: bool, data: &[u8]| -> ProgressAction {
assert!(!is_err);
seen_texts.push(data.as_bstr().into());
ProgressAction::Continue
};
let pack_read = rd.as_read_with_sidebands(&mut do_nothing);
#[cfg(all(not(feature = "blocking-io"), feature = "async-io"))]
Expand Down Expand Up @@ -132,7 +134,7 @@ async fn read_line_trait_method_reads_one_packet_line_at_a_time() -> crate::Resu

drop(r);

let mut r = rd.as_read_with_sidebands(|_, _| ());
let mut r = rd.as_read_with_sidebands(|_, _| ProgressAction::Continue);
out.clear();
r.read_line_to_string(&mut out).await?;
assert_eq!(out, "&");
Expand Down Expand Up @@ -174,7 +176,7 @@ async fn readline_reads_one_packet_line_at_a_time() -> crate::Result {

drop(r);

let mut r = rd.as_read_with_sidebands(|_, _| ());
let mut r = rd.as_read_with_sidebands(|_, _| ProgressAction::Continue);
let line = r.read_data_line().await.unwrap()??.as_bstr().unwrap();
assert_eq!(
line.as_bstr(),
Expand Down
5 changes: 4 additions & 1 deletion gix-protocol/src/fetch_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ impl Default for FetchConnection {
/// _Note_ that depending on the `delegate`, the actual action performed can be `ls-refs`, `clone` or `fetch`.
#[allow(clippy::result_large_err)]
#[maybe_async]
// TODO: remove this without losing test coverage - we have the same but better in `gix` and it's
// not really worth it to maintain the delegates here.
pub async fn fetch<F, D, T, P>(
mut transport: T,
mut delegate: D,
Expand Down Expand Up @@ -162,7 +164,8 @@ where
reader.set_progress_handler(Some(Box::new({
let mut remote_progress = progress.add_child("remote");
move |is_err: bool, data: &[u8]| {
crate::RemoteProgress::translate_to_progress(is_err, data, &mut remote_progress)
crate::RemoteProgress::translate_to_progress(is_err, data, &mut remote_progress);
gix_transport::packetline::read::ProgressAction::Continue
}
}) as gix_transport::client::HandleProgress));
}
9 changes: 7 additions & 2 deletions gix-protocol/tests/fetch/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ mod v2 {

#[cfg(feature = "async-client")]
use futures_lite::io::AsyncReadExt;
use gix_packetline::read::ProgressAction;
use gix_protocol::fetch::{
self,
response::{Acknowledgement, ShallowUpdate},
Expand All @@ -213,7 +214,9 @@ mod v2 {
let r = fetch::Response::from_line_reader(Protocol::V2, &mut reader).await?;
assert!(r.acknowledgements().is_empty(), "it should go straight to the packfile");
assert!(r.has_pack());
reader.set_progress_handler(Some(Box::new(|_is_err, _text| ())));
reader.set_progress_handler(Some(Box::new(|_is_err, _text| {
gix_transport::packetline::read::ProgressAction::Continue
})));
let mut buf = Vec::new();
let bytes_read = reader.read_to_end(&mut buf).await?;
assert_eq!(bytes_read, 876, "should be able to read the whole pack");
Expand Down Expand Up @@ -295,6 +298,7 @@ mod v2 {
let mut buf = Vec::new();
reader.set_progress_handler(Some(Box::new(|is_err: bool, _data: &[u8]| {
assert!(!is_err, "fixture does not have an error");
ProgressAction::Continue
}) as gix_transport::client::HandleProgress));
let bytes_read = reader.read_to_end(&mut buf).await?;
assert_eq!(bytes_read, 1643, "should be able to read the whole pack");
Expand Down Expand Up @@ -346,7 +350,8 @@ mod v2 {
assert!(r.has_pack());
let mut buf = Vec::new();
reader.set_progress_handler(Some(Box::new(|a: bool, b: &[u8]| {
gix_protocol::RemoteProgress::translate_to_progress(a, b, &mut gix_features::progress::Discard)
gix_protocol::RemoteProgress::translate_to_progress(a, b, &mut gix_features::progress::Discard);
ProgressAction::Continue
}) as gix_transport::client::HandleProgress));
let bytes_read = reader.read_to_end(&mut buf).await?;
assert_eq!(bytes_read, 5360, "should be able to read the whole pack");
Expand Down
Loading

0 comments on commit cc0f506

Please sign in to comment.