diff --git a/.gitattributes b/.gitattributes index 21a8a6960ba..c3e94f4dbf1 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1,5 +1,8 @@ **/generated-archives/*.tar.xz filter=lfs-disabled diff=lfs merge=lfs -text -# assure line feeds don't interfere with our working copy hash +# assure line feeds don't interfere with our working copy hash **/tests/fixtures/**/*.sh text crlf=input eol=lf /justfile text crlf=input eol=lf + +# have GitHub treat the gix-packetline-blocking src copy as auto-generated +gix-packetline-blocking/src/ linguist-generated=true diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e2901d1a930..285afbbb5e9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -176,7 +176,7 @@ jobs: # Let's not fail CI for this, it will fail locally often enough, and a crate a little bigger # than allows is no problem either if it comes to that. just check-size || true - + cargo-deny: runs-on: ubuntu-latest strategy: @@ -193,6 +193,7 @@ jobs: - uses: EmbarkStudios/cargo-deny-action@v1 with: command: check ${{ matrix.checks }} + wasm: name: WebAssembly runs-on: ubuntu-latest @@ -213,3 +214,32 @@ jobs: name: crates with 'wasm' feature - run: cd gix-pack && cargo build --all-features --target ${{ matrix.target }} name: gix-pack with all features (including wasm) + + check-packetline: + strategy: + fail-fast: false + matrix: + os: + - ubuntu-latest + # We consider this script read-only and its effect is the same everywhere. + # However, when changes are made to `etc/copy-packetline.sh`, re-enable the other platforms for testing. + # - macos-latest + # - windows-latest + runs-on: ${{ matrix.os }} + defaults: + run: + shell: bash + steps: + - uses: actions/checkout@v4 + - name: Check that working tree is initially clean + run: | + set -x + git status + git diff --exit-code + - name: Regenerate gix-packetline-blocking/src + run: etc/copy-packetline.sh + - name: Check that gix-packetline-blocking/src was already up to date + run: | + set -x + git status + git diff --exit-code diff --git a/etc/copy-packetline.sh b/etc/copy-packetline.sh new file mode 100755 index 00000000000..9520b3b9011 --- /dev/null +++ b/etc/copy-packetline.sh @@ -0,0 +1,152 @@ +#!/bin/bash + +set -euC -o pipefail + +readonly input_dir='gix-packetline/src' +readonly output_parent_dir='gix-packetline-blocking' +readonly output_dir="$output_parent_dir/src" + +function fail () { + printf '%s: error: %s\n' "$0" "$1" >&2 + exit 1 +} + +function chdir_toplevel () { + local root_padded root + + # Find the working tree's root. (Padding covers the trailing-newline case.) + root_padded="$(git rev-parse --show-toplevel && echo -n .)" || + fail 'git-rev-parse failed to find top-level dir' + root="${root_padded%$'\n.'}" + + cd -- "$root" +} + +function merging () { + local git_dir_padded git_dir + + # Find the .git directory. (Padding covers the trailing-newline case.) + git_dir_padded="$(git rev-parse --git-dir && echo -n .)" || + fail 'git-rev-parse failed to find git dir' + git_dir="${git_dir_padded%$'\n.'}" + + test -e "$git_dir/MERGE_HEAD" +} + +function output_dir_status () { + git status --porcelain --ignored=traditional -- "$output_dir" || + fail 'git-status failed' +} + +function check_output_dir () { + if ! test -e "$output_dir"; then + # The destination does not exist on disk, so nothing will be lost. Proceed. + return + fi + + if merging; then + # In a merge, it would be confusing to replace anything at the destination. + if output_dir_status | grep -q '^'; then + fail 'output location exists, and a merge is in progress' + fi + else + # We can lose data if anything of value at the destination is not in the + # index. (This includes unstaged deletions, for two reasons. We could lose + # track of which files had been deleted. More importantly, replacing a + # staged symlink or regular file with an unstaged directory is shown by + # git-status as only a deletion, even if the directory is non-empty.) + if output_dir_status | grep -q '^.[^ ]'; then + fail 'output location exists, with unstaged changes or ignored files' + fi + fi +} + +function first_line_ends_crlf () { + # This is tricky to check portably. In Cygwin-like environments including + # MSYS2 and Git Bash, most text processing tools, including awk, sed, and + # grep, automatically ignore \r before \n. Some ignore \r everywhere. Some + # can be told to keep \r, but in non-portable ways that may affect other + # implementations. Bash ignores \r in some places even without "-o igncr", + # and ignores \r even more with it, including in all text from command + # substitution. Simple checks may be non-portable to other OSes. Fortunately, + # tools that treat input as binary data are exempt (even cat, but "-v" is + # non-portable, and unreliable in general because lines can end in "^M"). + # This may be doable without od, by using tr more heavily, but it could be + # hard to avoid false positives with unexpected characters or \r without \n. + + head -n 1 -- "$1" | # Get the longest prefix with no non-trailing \n byte. + od -An -ta | # Represent all bytes symbolically, without addresses. + tr -sd '\n' ' ' | # Scrunch into one line, so "cr nl" appears as such. + grep -q 'cr nl$' # Check if the result signifies a \r\n line ending. +} + +function make_header () { + local input_file endline + + input_file="$1" + endline="$2" + + # shellcheck disable=SC2016 # The backticks are intentionally literal. + printf '// DO NOT EDIT - this is a copy of %s. Run `just copy-packetline` to update it.%s%s' \ + "$input_file" "$endline" "$endline" +} + +function copy_with_header () { + local input_file output_file endline + + input_file="$1" + output_file="$2" + + if first_line_ends_crlf "$input_file"; then + endline=$'\r\n' + else + endline=$'\n' + fi + + make_header "$input_file" "$endline" | cat -- - "$input_file" >"$output_file" +} + +function generate_one () { + local input_file output_file + + input_file="$1" + output_file="$output_dir${input_file#"$input_dir"}" + + if test -d "$input_file"; then + mkdir -p -- "$output_file" + elif test -L "$input_file"; then + # Cover this case separately, for more useful error messages. + fail "input file is symbolic link: $input_file" + elif ! test -f "$input_file"; then + # This covers less common kinds of files we can't or shouldn't process. + fail "input file neither regular file nor directory: $input_file" + elif [[ "$input_file" =~ \.rs$ ]]; then + copy_with_header "$input_file" "$output_file" + else + fail "input file not named as Rust source code: $input_file" + fi +} + +function generate_all () { + local input_file + + if ! test -d "$input_dir"; then + fail "no input directory: $input_dir" + fi + if ! test -d "$output_parent_dir"; then + fail "no output parent directory: $output_parent_dir" + fi + check_output_dir + + rm -rf -- "$output_dir" # It may be a directory, symlink, or regular file. + if test -e "$output_dir"; then + fail 'unable to remove output location' + fi + + find "$input_dir" -print0 | while IFS= read -r -d '' input_file; do + generate_one "$input_file" + done +} + +chdir_toplevel +generate_all diff --git a/gix-packetline-blocking/src b/gix-packetline-blocking/src deleted file mode 120000 index a92fc2d976d..00000000000 --- a/gix-packetline-blocking/src +++ /dev/null @@ -1 +0,0 @@ -../gix-packetline/src \ No newline at end of file diff --git a/gix-packetline-blocking/src/decode.rs b/gix-packetline-blocking/src/decode.rs new file mode 100644 index 00000000000..cde0a58beed --- /dev/null +++ b/gix-packetline-blocking/src/decode.rs @@ -0,0 +1,148 @@ +// DO NOT EDIT - this is a copy of gix-packetline/src/decode.rs. Run `just copy-packetline` to update it. + +use bstr::BString; + +use crate::{PacketLineRef, DELIMITER_LINE, FLUSH_LINE, MAX_DATA_LEN, MAX_LINE_LEN, RESPONSE_END_LINE, U16_HEX_BYTES}; + +/// The error used in the [`decode`][mod@crate::decode] module +#[derive(Debug, thiserror::Error)] +#[allow(missing_docs)] +pub enum Error { + #[error("Failed to decode the first four hex bytes indicating the line length: {err}")] + HexDecode { err: String }, + #[error("The data received claims to be larger than the maximum allowed size: got {length_in_bytes}, exceeds {MAX_DATA_LEN}")] + DataLengthLimitExceeded { length_in_bytes: usize }, + #[error("Received an invalid empty line")] + DataIsEmpty, + #[error("Received an invalid line of length 3")] + InvalidLineLength, + #[error("{data:?} - consumed {bytes_consumed} bytes")] + Line { data: BString, bytes_consumed: usize }, + #[error("Needing {bytes_needed} additional bytes to decode the line successfully")] + NotEnoughData { bytes_needed: usize }, +} + +/// +#[allow(clippy::empty_docs)] +pub mod band { + /// The error used in [`PacketLineRef::decode_band()`][super::PacketLineRef::decode_band()]. + #[derive(Debug, thiserror::Error)] + #[allow(missing_docs)] + pub enum Error { + #[error("attempt to decode a non-side channel line or input was malformed: {band_id}")] + InvalidSideBand { band_id: u8 }, + #[error("attempt to decode a non-data line into a side-channel band")] + NonDataLine, + } +} + +/// A utility return type to support incremental parsing of packet lines. +#[derive(Debug, Clone)] +pub enum Stream<'a> { + /// Indicate a single packet line was parsed completely + Complete { + /// The parsed packet line + line: PacketLineRef<'a>, + /// The amount of bytes consumed from input + bytes_consumed: usize, + }, + /// A packet line could not yet be parsed due to missing bytes + Incomplete { + /// The amount of additional bytes needed for the parsing to complete + bytes_needed: usize, + }, +} + +/// The result of [`hex_prefix()`] indicating either a special packet line or the amount of wanted bytes +pub enum PacketLineOrWantedSize<'a> { + /// The special kind of packet line decoded from the hex prefix. It never contains actual data. + Line(PacketLineRef<'a>), + /// The amount of bytes indicated by the hex prefix of the packet line. + Wanted(u16), +} + +/// Decode the `four_bytes` packet line prefix provided in hexadecimal form and check it for validity. +pub fn hex_prefix(four_bytes: &[u8]) -> Result, Error> { + debug_assert_eq!(four_bytes.len(), 4, "need four hex bytes"); + for (line_bytes, line_type) in &[ + (FLUSH_LINE, PacketLineRef::Flush), + (DELIMITER_LINE, PacketLineRef::Delimiter), + (RESPONSE_END_LINE, PacketLineRef::ResponseEnd), + ] { + if four_bytes == *line_bytes { + return Ok(PacketLineOrWantedSize::Line(*line_type)); + } + } + + let mut buf = [0u8; U16_HEX_BYTES / 2]; + faster_hex::hex_decode(four_bytes, &mut buf).map_err(|err| Error::HexDecode { err: err.to_string() })?; + let wanted_bytes = u16::from_be_bytes(buf); + + if wanted_bytes == 3 { + return Err(Error::InvalidLineLength); + } + if wanted_bytes == 4 { + return Err(Error::DataIsEmpty); + } + debug_assert!( + wanted_bytes as usize > U16_HEX_BYTES, + "by now there should be more wanted bytes than prefix bytes" + ); + Ok(PacketLineOrWantedSize::Wanted(wanted_bytes - U16_HEX_BYTES as u16)) +} + +/// Obtain a `PacketLine` from `data` after assuring `data` is small enough to fit. +pub fn to_data_line(data: &[u8]) -> Result, Error> { + if data.len() > MAX_LINE_LEN { + return Err(Error::DataLengthLimitExceeded { + length_in_bytes: data.len(), + }); + } + + Ok(PacketLineRef::Data(data)) +} + +/// Decode `data` as packet line while reporting whether the data is complete or not using a [`Stream`]. +pub fn streaming(data: &[u8]) -> Result, Error> { + let data_len = data.len(); + if data_len < U16_HEX_BYTES { + return Ok(Stream::Incomplete { + bytes_needed: U16_HEX_BYTES - data_len, + }); + } + let wanted_bytes = match hex_prefix(&data[..U16_HEX_BYTES])? { + PacketLineOrWantedSize::Wanted(s) => s as usize, + PacketLineOrWantedSize::Line(line) => { + return Ok(Stream::Complete { + line, + bytes_consumed: 4, + }) + } + } + U16_HEX_BYTES; + if wanted_bytes > MAX_LINE_LEN { + return Err(Error::DataLengthLimitExceeded { + length_in_bytes: wanted_bytes, + }); + } + if data_len < wanted_bytes { + return Ok(Stream::Incomplete { + bytes_needed: wanted_bytes - data_len, + }); + } + + Ok(Stream::Complete { + line: to_data_line(&data[U16_HEX_BYTES..wanted_bytes])?, + bytes_consumed: wanted_bytes, + }) +} + +/// Decode an entire packet line from data or fail. +/// +/// Note that failure also happens if there is not enough data to parse a complete packet line, as opposed to [`streaming()`] decoding +/// succeeds in that case, stating how much more bytes are required. +pub fn all_at_once(data: &[u8]) -> Result, Error> { + match streaming(data)? { + Stream::Complete { line, .. } => Ok(line), + Stream::Incomplete { bytes_needed } => Err(Error::NotEnoughData { bytes_needed }), + } +} diff --git a/gix-packetline-blocking/src/encode/async_io.rs b/gix-packetline-blocking/src/encode/async_io.rs new file mode 100644 index 00000000000..384ec856f5e --- /dev/null +++ b/gix-packetline-blocking/src/encode/async_io.rs @@ -0,0 +1,215 @@ +// DO NOT EDIT - this is a copy of gix-packetline/src/encode/async_io.rs. Run `just copy-packetline` to update it. + +use std::{ + io, + pin::Pin, + task::{Context, Poll}, +}; + +use futures_io::AsyncWrite; +use futures_lite::AsyncWriteExt; + +use super::u16_to_hex; +use crate::{encode::Error, Channel, DELIMITER_LINE, ERR_PREFIX, FLUSH_LINE, MAX_DATA_LEN, RESPONSE_END_LINE}; + +pin_project_lite::pin_project! { + /// A way of writing packet lines asynchronously. + pub struct LineWriter<'a, W> { + #[pin] + pub(crate) writer: W, + pub(crate) prefix: &'a [u8], + pub(crate) suffix: &'a [u8], + state: State<'a>, + } +} + +enum State<'a> { + Idle, + WriteHexLen([u8; 4], usize), + WritePrefix(&'a [u8]), + WriteData(usize), + WriteSuffix(&'a [u8]), +} + +impl<'a, W: AsyncWrite + Unpin> LineWriter<'a, W> { + /// Create a new line writer writing data with a `prefix` and `suffix`. + /// + /// Keep the additional `prefix` or `suffix` buffers empty if no prefix or suffix should be written. + pub fn new(writer: W, prefix: &'a [u8], suffix: &'a [u8]) -> Self { + LineWriter { + writer, + prefix, + suffix, + state: State::Idle, + } + } + + /// Consume self and reveal the inner writer. + pub fn into_inner(self) -> W { + self.writer + } +} + +fn into_io_err(err: Error) -> io::Error { + io::Error::new(io::ErrorKind::Other, err) +} + +impl AsyncWrite for LineWriter<'_, W> { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, data: &[u8]) -> Poll> { + use futures_lite::ready; + let mut this = self.project(); + loop { + match &mut this.state { + State::Idle => { + let data_len = this.prefix.len() + data.len() + this.suffix.len(); + if data_len > MAX_DATA_LEN { + return Poll::Ready(Err(into_io_err(Error::DataLengthLimitExceeded { + length_in_bytes: data_len, + }))); + } + if data.is_empty() { + return Poll::Ready(Err(into_io_err(Error::DataIsEmpty))); + } + let data_len = data_len + 4; + let len_buf = u16_to_hex(data_len as u16); + *this.state = State::WriteHexLen(len_buf, 0) + } + State::WriteHexLen(hex_len, written) => { + while *written != hex_len.len() { + let n = ready!(this.writer.as_mut().poll_write(cx, &hex_len[*written..]))?; + if n == 0 { + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); + } + *written += n; + } + if this.prefix.is_empty() { + *this.state = State::WriteData(0) + } else { + *this.state = State::WritePrefix(this.prefix) + } + } + State::WritePrefix(buf) => { + while !buf.is_empty() { + let n = ready!(this.writer.as_mut().poll_write(cx, buf))?; + if n == 0 { + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); + } + let (_, rest) = std::mem::take(buf).split_at(n); + *buf = rest; + } + *this.state = State::WriteData(0) + } + State::WriteData(written) => { + while *written != data.len() { + let n = ready!(this.writer.as_mut().poll_write(cx, &data[*written..]))?; + if n == 0 { + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); + } + *written += n; + } + if this.suffix.is_empty() { + let written = 4 + this.prefix.len() + *written; + *this.state = State::Idle; + return Poll::Ready(Ok(written)); + } else { + *this.state = State::WriteSuffix(this.suffix) + } + } + State::WriteSuffix(buf) => { + while !buf.is_empty() { + let n = ready!(this.writer.as_mut().poll_write(cx, buf))?; + if n == 0 { + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); + } + let (_, rest) = std::mem::take(buf).split_at(n); + *buf = rest; + } + *this.state = State::Idle; + return Poll::Ready(Ok(4 + this.prefix.len() + data.len() + this.suffix.len())); + } + } + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.writer.poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.writer.poll_close(cx) + } +} + +async fn prefixed_and_suffixed_data_to_write( + prefix: &[u8], + data: &[u8], + suffix: &[u8], + mut out: impl AsyncWrite + Unpin, +) -> io::Result { + let data_len = prefix.len() + data.len() + suffix.len(); + if data_len > MAX_DATA_LEN { + return Err(into_io_err(Error::DataLengthLimitExceeded { + length_in_bytes: data_len, + })); + } + if data.is_empty() { + return Err(into_io_err(Error::DataIsEmpty)); + } + + let data_len = data_len + 4; + let buf = u16_to_hex(data_len as u16); + + out.write_all(&buf).await?; + if !prefix.is_empty() { + out.write_all(prefix).await?; + } + out.write_all(data).await?; + if !suffix.is_empty() { + out.write_all(suffix).await?; + } + Ok(data_len) +} + +async fn prefixed_data_to_write(prefix: &[u8], data: &[u8], out: impl AsyncWrite + Unpin) -> io::Result { + prefixed_and_suffixed_data_to_write(prefix, data, &[], out).await +} + +/// Write a `text` message to `out`, which is assured to end in a newline. +pub async fn text_to_write(text: &[u8], out: impl AsyncWrite + Unpin) -> io::Result { + prefixed_and_suffixed_data_to_write(&[], text, &[b'\n'], out).await +} + +/// Write a `data` message to `out`. +pub async fn data_to_write(data: &[u8], out: impl AsyncWrite + Unpin) -> io::Result { + prefixed_data_to_write(&[], data, out).await +} + +/// Write an error `message` to `out`. +pub async fn error_to_write(message: &[u8], out: impl AsyncWrite + Unpin) -> io::Result { + prefixed_data_to_write(ERR_PREFIX, message, out).await +} + +/// Write a response-end message to `out`. +pub async fn response_end_to_write(mut out: impl AsyncWrite + Unpin) -> io::Result { + out.write_all(RESPONSE_END_LINE).await?; + Ok(4) +} + +/// Write a delim message to `out`. +pub async fn delim_to_write(mut out: impl AsyncWrite + Unpin) -> io::Result { + out.write_all(DELIMITER_LINE).await?; + Ok(4) +} + +/// Write a flush message to `out`. +pub async fn flush_to_write(mut out: impl AsyncWrite + Unpin) -> io::Result { + out.write_all(FLUSH_LINE).await?; + Ok(4) +} + +/// Write `data` of `kind` to `out` using side-band encoding. +pub async fn band_to_write(kind: Channel, data: &[u8], out: impl AsyncWrite + Unpin) -> io::Result { + prefixed_data_to_write(&[kind as u8], data, out).await +} diff --git a/gix-packetline-blocking/src/encode/blocking_io.rs b/gix-packetline-blocking/src/encode/blocking_io.rs new file mode 100644 index 00000000000..370ab992204 --- /dev/null +++ b/gix-packetline-blocking/src/encode/blocking_io.rs @@ -0,0 +1,78 @@ +// DO NOT EDIT - this is a copy of gix-packetline/src/encode/blocking_io.rs. Run `just copy-packetline` to update it. + +use std::io; + +use super::u16_to_hex; +use crate::{encode::Error, Channel, DELIMITER_LINE, ERR_PREFIX, FLUSH_LINE, MAX_DATA_LEN, RESPONSE_END_LINE}; + +/// Write a response-end message to `out`. +pub fn response_end_to_write(mut out: impl io::Write) -> io::Result { + out.write_all(RESPONSE_END_LINE).map(|_| 4) +} + +/// Write a delim message to `out`. +pub fn delim_to_write(mut out: impl io::Write) -> io::Result { + out.write_all(DELIMITER_LINE).map(|_| 4) +} + +/// Write a flush message to `out`. +pub fn flush_to_write(mut out: impl io::Write) -> io::Result { + out.write_all(FLUSH_LINE).map(|_| 4) +} + +/// Write an error `message` to `out`. +pub fn error_to_write(message: &[u8], out: impl io::Write) -> io::Result { + prefixed_data_to_write(ERR_PREFIX, message, out) +} + +/// Write `data` of `kind` to `out` using side-band encoding. +pub fn band_to_write(kind: Channel, data: &[u8], out: impl io::Write) -> io::Result { + prefixed_data_to_write(&[kind as u8], data, out) +} + +/// Write a `data` message to `out`. +pub fn data_to_write(data: &[u8], out: impl io::Write) -> io::Result { + prefixed_data_to_write(&[], data, out) +} + +/// Write a `text` message to `out`, which is assured to end in a newline. +pub fn text_to_write(text: &[u8], out: impl io::Write) -> io::Result { + prefixed_and_suffixed_data_to_write(&[], text, &[b'\n'], out) +} + +fn prefixed_data_to_write(prefix: &[u8], data: &[u8], out: impl io::Write) -> io::Result { + prefixed_and_suffixed_data_to_write(prefix, data, &[], out) +} + +fn prefixed_and_suffixed_data_to_write( + prefix: &[u8], + data: &[u8], + suffix: &[u8], + mut out: impl io::Write, +) -> io::Result { + let data_len = prefix.len() + data.len() + suffix.len(); + if data_len > MAX_DATA_LEN { + return Err(io::Error::new( + io::ErrorKind::Other, + Error::DataLengthLimitExceeded { + length_in_bytes: data_len, + }, + )); + } + if data.is_empty() { + return Err(io::Error::new(io::ErrorKind::Other, Error::DataIsEmpty)); + } + + let data_len = data_len + 4; + let buf = u16_to_hex(data_len as u16); + + out.write_all(&buf)?; + if !prefix.is_empty() { + out.write_all(prefix)?; + } + out.write_all(data)?; + if !suffix.is_empty() { + out.write_all(suffix)?; + } + Ok(data_len) +} diff --git a/gix-packetline-blocking/src/encode/mod.rs b/gix-packetline-blocking/src/encode/mod.rs new file mode 100644 index 00000000000..238957cc4ca --- /dev/null +++ b/gix-packetline-blocking/src/encode/mod.rs @@ -0,0 +1,29 @@ +// DO NOT EDIT - this is a copy of gix-packetline/src/encode/mod.rs. Run `just copy-packetline` to update it. + +use crate::MAX_DATA_LEN; + +/// The error returned by most functions in the [`encode`][crate::encode] module +#[derive(Debug, thiserror::Error)] +#[allow(missing_docs)] +pub enum Error { + #[error("Cannot encode more than {MAX_DATA_LEN} bytes, got {length_in_bytes}")] + DataLengthLimitExceeded { length_in_bytes: usize }, + #[error("Empty lines are invalid")] + DataIsEmpty, +} + +#[cfg(all(not(feature = "blocking-io"), feature = "async-io"))] +mod async_io; +#[cfg(all(not(feature = "blocking-io"), feature = "async-io"))] +pub use async_io::*; + +#[cfg(feature = "blocking-io")] +mod blocking_io; +#[cfg(feature = "blocking-io")] +pub use blocking_io::*; + +pub(crate) fn u16_to_hex(value: u16) -> [u8; 4] { + let mut buf = [0u8; 4]; + faster_hex::hex_encode(&value.to_be_bytes(), &mut buf).expect("two bytes to 4 hex chars never fails"); + buf +} diff --git a/gix-packetline-blocking/src/lib.rs b/gix-packetline-blocking/src/lib.rs new file mode 100644 index 00000000000..8b7ae4479d5 --- /dev/null +++ b/gix-packetline-blocking/src/lib.rs @@ -0,0 +1,110 @@ +// DO NOT EDIT - this is a copy of gix-packetline/src/lib.rs. Run `just copy-packetline` to update it. + +//! Read and write the git packet line wire format without copying it. +//! +//! For reading the packet line format use the [`StreamingPeekableIter`], and for writing the [`Writer`]. +//! ## Feature Flags +#![cfg_attr( + all(doc, all(doc, feature = "document-features")), + doc = ::document_features::document_features!() +)] +#![cfg_attr(all(doc, feature = "document-features"), feature(doc_cfg, doc_auto_cfg))] +#![deny(missing_docs, rust_2018_idioms, unsafe_code)] + +const U16_HEX_BYTES: usize = 4; +const MAX_DATA_LEN: usize = 65516; +const MAX_LINE_LEN: usize = MAX_DATA_LEN + U16_HEX_BYTES; +const FLUSH_LINE: &[u8] = b"0000"; +const DELIMITER_LINE: &[u8] = b"0001"; +const RESPONSE_END_LINE: &[u8] = b"0002"; +const ERR_PREFIX: &[u8] = b"ERR "; + +/// One of three side-band types allowing to multiplex information over a single connection. +#[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone, Copy)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub enum Channel { + /// The usable data itself in any format. + Data = 1, + /// Progress information in a user-readable format. + Progress = 2, + /// Error information in a user readable format. Receiving it usually terminates the connection. + Error = 3, +} + +mod line; +/// +#[allow(clippy::empty_docs)] +pub mod read; + +/// +#[allow(clippy::empty_docs)] +#[cfg(any(feature = "async-io", feature = "blocking-io"))] +mod write; +#[cfg(all(not(feature = "blocking-io"), feature = "async-io"))] +pub use write::async_io::Writer; +#[cfg(feature = "blocking-io")] +pub use write::blocking_io::Writer; + +/// A borrowed packet line as it refers to a slice of data by reference. +#[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone, Copy)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub enum PacketLineRef<'a> { + /// A chunk of raw data. + Data(&'a [u8]), + /// A flush packet. + Flush, + /// A delimiter packet. + Delimiter, + /// The end of the response. + ResponseEnd, +} + +/// A packet line representing an Error in a side-band channel. +#[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone, Copy)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct ErrorRef<'a>(pub &'a [u8]); + +/// A packet line representing text, which may include a trailing newline. +#[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone, Copy)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct TextRef<'a>(pub &'a [u8]); + +/// A band in a side-band channel. +#[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone, Copy)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub enum BandRef<'a> { + /// A band carrying data. + Data(&'a [u8]), + /// A band carrying user readable progress information. + Progress(&'a [u8]), + /// A band carrying user readable errors. + Error(&'a [u8]), +} + +/// Read pack lines one after another, without consuming more than needed from the underlying +/// [`Read`][std::io::Read]. [`Flush`][PacketLineRef::Flush] lines cause the reader to stop producing lines forever, +/// leaving [`Read`][std::io::Read] at the start of whatever comes next. +/// +/// This implementation tries hard not to allocate at all which leads to quite some added complexity and plenty of extra memory copies. +pub struct StreamingPeekableIter { + read: T, + peek_buf: Vec, + #[cfg(any(feature = "blocking-io", feature = "async-io"))] + buf: Vec, + fail_on_err_lines: bool, + delimiters: &'static [PacketLineRef<'static>], + is_done: bool, + stopped_at: Option>, + #[cfg_attr(all(not(feature = "async-io"), not(feature = "blocking-io")), allow(dead_code))] + trace: bool, +} + +/// Utilities to help decoding packet lines +pub mod decode; +#[doc(inline)] +pub use decode::all_at_once as decode; +/// Utilities to encode different kinds of packet lines +pub mod encode; + +#[cfg(all(feature = "async-io", feature = "blocking-io"))] +compile_error!("Cannot set both 'blocking-io' and 'async-io' features as they are mutually exclusive"); diff --git a/gix-packetline-blocking/src/line/async_io.rs b/gix-packetline-blocking/src/line/async_io.rs new file mode 100644 index 00000000000..3a631c44c9a --- /dev/null +++ b/gix-packetline-blocking/src/line/async_io.rs @@ -0,0 +1,49 @@ +// DO NOT EDIT - this is a copy of gix-packetline/src/line/async_io.rs. Run `just copy-packetline` to update it. + +use std::io; + +use futures_io::AsyncWrite; + +use crate::{encode, BandRef, Channel, ErrorRef, PacketLineRef, TextRef}; + +impl<'a> BandRef<'a> { + /// Serialize this instance to `out`, returning the amount of bytes written. + /// + /// The data written to `out` can be decoded with [`Borrowed::decode_band()]`. + pub async fn write_to(&self, out: impl AsyncWrite + Unpin) -> io::Result { + match self { + BandRef::Data(d) => encode::band_to_write(Channel::Data, d, out), + BandRef::Progress(d) => encode::band_to_write(Channel::Progress, d, out), + BandRef::Error(d) => encode::band_to_write(Channel::Error, d, out), + } + .await + } +} + +impl<'a> TextRef<'a> { + /// Serialize this instance to `out`, appending a newline if there is none, returning the amount of bytes written. + pub async fn write_to(&self, out: impl AsyncWrite + Unpin) -> io::Result { + encode::text_to_write(self.0, out).await + } +} + +impl<'a> ErrorRef<'a> { + /// Serialize this line as error to `out`. + /// + /// This includes a marker to allow decoding it outside of a side-band channel, returning the amount of bytes written. + pub async fn write_to(&self, out: impl AsyncWrite + Unpin) -> io::Result { + encode::error_to_write(self.0, out).await + } +} + +impl<'a> PacketLineRef<'a> { + /// Serialize this instance to `out` in git `packetline` format, returning the amount of bytes written to `out`. + pub async fn write_to(&self, out: impl AsyncWrite + Unpin) -> io::Result { + match self { + PacketLineRef::Data(d) => encode::data_to_write(d, out).await, + PacketLineRef::Flush => encode::flush_to_write(out).await, + PacketLineRef::Delimiter => encode::delim_to_write(out).await, + PacketLineRef::ResponseEnd => encode::response_end_to_write(out).await, + } + } +} diff --git a/gix-packetline-blocking/src/line/blocking_io.rs b/gix-packetline-blocking/src/line/blocking_io.rs new file mode 100644 index 00000000000..0354e3de77a --- /dev/null +++ b/gix-packetline-blocking/src/line/blocking_io.rs @@ -0,0 +1,46 @@ +// DO NOT EDIT - this is a copy of gix-packetline/src/line/blocking_io.rs. Run `just copy-packetline` to update it. + +use std::io; + +use crate::{encode, BandRef, Channel, ErrorRef, PacketLineRef, TextRef}; + +impl<'a> BandRef<'a> { + /// Serialize this instance to `out`, returning the amount of bytes written. + /// + /// The data written to `out` can be decoded with [`Borrowed::decode_band()]`. + pub fn write_to(&self, out: impl io::Write) -> io::Result { + match self { + BandRef::Data(d) => encode::band_to_write(Channel::Data, d, out), + BandRef::Progress(d) => encode::band_to_write(Channel::Progress, d, out), + BandRef::Error(d) => encode::band_to_write(Channel::Error, d, out), + } + } +} + +impl<'a> TextRef<'a> { + /// Serialize this instance to `out`, appending a newline if there is none, returning the amount of bytes written. + pub fn write_to(&self, out: impl io::Write) -> io::Result { + encode::text_to_write(self.0, out) + } +} + +impl<'a> ErrorRef<'a> { + /// Serialize this line as error to `out`. + /// + /// This includes a marker to allow decoding it outside of a side-band channel, returning the amount of bytes written. + pub fn write_to(&self, out: impl io::Write) -> io::Result { + encode::error_to_write(self.0, out) + } +} + +impl<'a> PacketLineRef<'a> { + /// Serialize this instance to `out` in git `packetline` format, returning the amount of bytes written to `out`. + pub fn write_to(&self, out: impl io::Write) -> io::Result { + match self { + PacketLineRef::Data(d) => encode::data_to_write(d, out), + PacketLineRef::Flush => encode::flush_to_write(out), + PacketLineRef::Delimiter => encode::delim_to_write(out), + PacketLineRef::ResponseEnd => encode::response_end_to_write(out), + } + } +} diff --git a/gix-packetline-blocking/src/line/mod.rs b/gix-packetline-blocking/src/line/mod.rs new file mode 100644 index 00000000000..d584f0d3a8e --- /dev/null +++ b/gix-packetline-blocking/src/line/mod.rs @@ -0,0 +1,90 @@ +// DO NOT EDIT - this is a copy of gix-packetline/src/line/mod.rs. Run `just copy-packetline` to update it. + +use bstr::BStr; + +use crate::{decode, BandRef, Channel, ErrorRef, PacketLineRef, TextRef, ERR_PREFIX}; + +impl<'a> PacketLineRef<'a> { + /// Return this instance as slice if it's [`Data`][PacketLineRef::Data]. + pub fn as_slice(&self) -> Option<&'a [u8]> { + match self { + PacketLineRef::Data(d) => Some(d), + PacketLineRef::Flush | PacketLineRef::Delimiter | PacketLineRef::ResponseEnd => None, + } + } + /// Return this instance's [`as_slice()`][PacketLineRef::as_slice()] as [`BStr`]. + pub fn as_bstr(&self) -> Option<&'a BStr> { + self.as_slice().map(Into::into) + } + /// Interpret this instance's [`as_slice()`][PacketLineRef::as_slice()] as [`ErrorRef`]. + /// + /// This works for any data received in an error [channel][crate::Channel]. + /// + /// Note that this creates an unchecked error using the slice verbatim, which is useful to [serialize it][ErrorRef::write_to()]. + /// See [`check_error()`][PacketLineRef::check_error()] for a version that assures the error information is in the expected format. + pub fn as_error(&self) -> Option> { + self.as_slice().map(ErrorRef) + } + /// Check this instance's [`as_slice()`][PacketLineRef::as_slice()] is a valid [`ErrorRef`] and return it. + /// + /// This works for any data received in an error [channel][crate::Channel]. + pub fn check_error(&self) -> Option> { + self.as_slice().and_then(|data| { + if data.len() >= ERR_PREFIX.len() && &data[..ERR_PREFIX.len()] == ERR_PREFIX { + Some(ErrorRef(&data[ERR_PREFIX.len()..])) + } else { + None + } + }) + } + /// Return this instance as text, with the trailing newline truncated if present. + pub fn as_text(&self) -> Option> { + self.as_slice().map(Into::into) + } + + /// Interpret the data in this [`slice`][PacketLineRef::as_slice()] as [`BandRef`] according to the given `kind` of channel. + /// + /// Note that this is only relevant in a side-band channel. + /// See [`decode_band()`][PacketLineRef::decode_band()] in case `kind` is unknown. + pub fn as_band(&self, kind: Channel) -> Option> { + self.as_slice().map(|d| match kind { + Channel::Data => BandRef::Data(d), + Channel::Progress => BandRef::Progress(d), + Channel::Error => BandRef::Error(d), + }) + } + + /// Decode the band of this [`slice`][PacketLineRef::as_slice()] + pub fn decode_band(&self) -> Result, decode::band::Error> { + let d = self.as_slice().ok_or(decode::band::Error::NonDataLine)?; + Ok(match d[0] { + 1 => BandRef::Data(&d[1..]), + 2 => BandRef::Progress(&d[1..]), + 3 => BandRef::Error(&d[1..]), + band => return Err(decode::band::Error::InvalidSideBand { band_id: band }), + }) + } +} + +impl<'a> From<&'a [u8]> for TextRef<'a> { + fn from(d: &'a [u8]) -> Self { + let d = if d[d.len() - 1] == b'\n' { &d[..d.len() - 1] } else { d }; + TextRef(d) + } +} + +impl<'a> TextRef<'a> { + /// Return this instance's data. + pub fn as_slice(&self) -> &'a [u8] { + self.0 + } + /// Return this instance's data as [`BStr`]. + pub fn as_bstr(&self) -> &'a BStr { + self.0.into() + } +} + +#[cfg(all(not(feature = "blocking-io"), feature = "async-io"))] +mod async_io; +#[cfg(feature = "blocking-io")] +mod blocking_io; diff --git a/gix-packetline-blocking/src/read/async_io.rs b/gix-packetline-blocking/src/read/async_io.rs new file mode 100644 index 00000000000..a0d347d84c7 --- /dev/null +++ b/gix-packetline-blocking/src/read/async_io.rs @@ -0,0 +1,201 @@ +// DO NOT EDIT - this is a copy of gix-packetline/src/read/async_io.rs. Run `just copy-packetline` to update it. + +use std::io; + +use bstr::ByteSlice; +use futures_io::AsyncRead; +use futures_lite::AsyncReadExt; + +use crate::{ + decode, + read::{ExhaustiveOutcome, ProgressAction, WithSidebands}, + PacketLineRef, StreamingPeekableIter, MAX_LINE_LEN, U16_HEX_BYTES, +}; + +/// Non-IO methods +impl StreamingPeekableIter +where + T: AsyncRead + Unpin, +{ + #[allow(clippy::needless_lifetimes)] // TODO: remove once this is clippy false positive is fixed + async fn read_line_inner<'a>( + reader: &mut T, + buf: &'a mut [u8], + ) -> io::Result, decode::Error>> { + let (hex_bytes, data_bytes) = buf.split_at_mut(4); + reader.read_exact(hex_bytes).await?; + let num_data_bytes = match decode::hex_prefix(hex_bytes) { + Ok(decode::PacketLineOrWantedSize::Line(line)) => return Ok(Ok(line)), + Ok(decode::PacketLineOrWantedSize::Wanted(additional_bytes)) => additional_bytes as usize, + Err(err) => return Ok(Err(err)), + }; + + let (data_bytes, _) = data_bytes.split_at_mut(num_data_bytes); + reader.read_exact(data_bytes).await?; + match decode::to_data_line(data_bytes) { + Ok(line) => Ok(Ok(line)), + Err(err) => Ok(Err(err)), + } + } + + /// This function is needed to help the borrow checker allow us to return references all the time + /// It contains a bunch of logic shared between peek and `read_line` invocations. + async fn read_line_inner_exhaustive<'a>( + reader: &mut T, + buf: &'a mut Vec, + delimiters: &[PacketLineRef<'static>], + fail_on_err_lines: bool, + buf_resize: bool, + trace: bool, + ) -> ExhaustiveOutcome<'a> { + ( + false, + None, + Some(match Self::read_line_inner(reader, buf).await { + Ok(Ok(line)) => { + if trace { + match line { + #[allow(unused_variables)] + PacketLineRef::Data(d) => { + gix_trace::trace!("<< {}", d.as_bstr().trim().as_bstr()); + } + PacketLineRef::Flush => { + gix_trace::trace!("<< FLUSH"); + } + PacketLineRef::Delimiter => { + gix_trace::trace!("<< DELIM"); + } + PacketLineRef::ResponseEnd => { + gix_trace::trace!("<< RESPONSE_END"); + } + } + } + if delimiters.contains(&line) { + let stopped_at = delimiters.iter().find(|l| **l == line).copied(); + buf.clear(); + return (true, stopped_at, None); + } else if fail_on_err_lines { + if let Some(err) = line.check_error() { + let err = err.0.as_bstr().to_owned(); + buf.clear(); + return ( + true, + None, + Some(Err(io::Error::new( + io::ErrorKind::Other, + crate::read::Error { message: err }, + ))), + ); + } + } + let len = line.as_slice().map_or(U16_HEX_BYTES, |s| s.len() + U16_HEX_BYTES); + if buf_resize { + buf.resize(len, 0); + } + Ok(Ok(crate::decode(buf).expect("only valid data here"))) + } + Ok(Err(err)) => { + buf.clear(); + Ok(Err(err)) + } + Err(err) => { + buf.clear(); + Err(err) + } + }), + ) + } + + /// Read a packet line into the internal buffer and return it. + /// + /// Returns `None` if the end of iteration is reached because of one of the following: + /// + /// * natural EOF + /// * ERR packet line encountered if [`fail_on_err_lines()`][StreamingPeekableIter::fail_on_err_lines()] is true. + /// * A `delimiter` packet line encountered + pub async fn read_line(&mut self) -> Option, decode::Error>>> { + if self.is_done { + return None; + } + if !self.peek_buf.is_empty() { + std::mem::swap(&mut self.peek_buf, &mut self.buf); + self.peek_buf.clear(); + Some(Ok(Ok(crate::decode(&self.buf).expect("only valid data in peek buf")))) + } else { + if self.buf.len() != MAX_LINE_LEN { + self.buf.resize(MAX_LINE_LEN, 0); + } + let (is_done, stopped_at, res) = Self::read_line_inner_exhaustive( + &mut self.read, + &mut self.buf, + self.delimiters, + self.fail_on_err_lines, + false, + self.trace, + ) + .await; + self.is_done = is_done; + self.stopped_at = stopped_at; + res + } + } + + /// Peek the next packet line without consuming it. Returns `None` if a stop-packet or an error + /// was encountered. + /// + /// Multiple calls to peek will return the same packet line, if there is one. + pub async fn peek_line(&mut self) -> Option, decode::Error>>> { + if self.is_done { + return None; + } + if self.peek_buf.is_empty() { + self.peek_buf.resize(MAX_LINE_LEN, 0); + let (is_done, stopped_at, res) = Self::read_line_inner_exhaustive( + &mut self.read, + &mut self.peek_buf, + self.delimiters, + self.fail_on_err_lines, + true, + self.trace, + ) + .await; + self.is_done = is_done; + self.stopped_at = stopped_at; + res + } else { + Some(Ok(Ok(crate::decode(&self.peek_buf).expect("only valid data here")))) + } + } + + /// Same as [`as_read_with_sidebands(…)`][StreamingPeekableIter::as_read_with_sidebands()], but for channels without side band support. + /// + /// Due to the preconfigured function type this method can be called without 'turbofish'. + #[allow(clippy::type_complexity)] + pub fn as_read(&mut self) -> WithSidebands<'_, T, fn(bool, &[u8]) -> ProgressAction> { + WithSidebands::new(self) + } + + /// Return this instance as implementor of [`Read`][io::Read] assuming side bands to be used in all received packet lines. + /// Each invocation of [`read_line()`][io::BufRead::read_line()] returns a packet line. + /// + /// Progress or error information will be passed to the given `handle_progress(is_error, text)` function, with `is_error: bool` + /// being true in case the `text` is to be interpreted as error. + /// + /// _Please note_ that side bands need to be negotiated with the server. + pub fn as_read_with_sidebands ProgressAction + Unpin>( + &mut self, + handle_progress: F, + ) -> WithSidebands<'_, T, F> { + WithSidebands::with_progress_handler(self, handle_progress) + } + + /// Same as [`as_read_with_sidebands(…)`][StreamingPeekableIter::as_read_with_sidebands()], but for channels without side band support. + /// + /// The type parameter `F` needs to be configured for this method to be callable using the 'turbofish' operator. + /// Use [`as_read()`][StreamingPeekableIter::as_read()]. + pub fn as_read_without_sidebands ProgressAction + Unpin>( + &mut self, + ) -> WithSidebands<'_, T, F> { + WithSidebands::without_progress_handler(self) + } +} diff --git a/gix-packetline-blocking/src/read/blocking_io.rs b/gix-packetline-blocking/src/read/blocking_io.rs new file mode 100644 index 00000000000..ce2035a1a3e --- /dev/null +++ b/gix-packetline-blocking/src/read/blocking_io.rs @@ -0,0 +1,192 @@ +// DO NOT EDIT - this is a copy of gix-packetline/src/read/blocking_io.rs. Run `just copy-packetline` to update it. + +use std::io; + +use bstr::ByteSlice; + +use crate::{ + decode, + read::{ExhaustiveOutcome, ProgressAction, WithSidebands}, + PacketLineRef, StreamingPeekableIter, MAX_LINE_LEN, U16_HEX_BYTES, +}; + +/// Non-IO methods +impl StreamingPeekableIter +where + T: io::Read, +{ + fn read_line_inner<'a>(reader: &mut T, buf: &'a mut [u8]) -> io::Result, decode::Error>> { + let (hex_bytes, data_bytes) = buf.split_at_mut(4); + reader.read_exact(hex_bytes)?; + let num_data_bytes = match decode::hex_prefix(hex_bytes) { + Ok(decode::PacketLineOrWantedSize::Line(line)) => return Ok(Ok(line)), + Ok(decode::PacketLineOrWantedSize::Wanted(additional_bytes)) => additional_bytes as usize, + Err(err) => return Ok(Err(err)), + }; + + let (data_bytes, _) = data_bytes.split_at_mut(num_data_bytes); + reader.read_exact(data_bytes)?; + match decode::to_data_line(data_bytes) { + Ok(line) => Ok(Ok(line)), + Err(err) => Ok(Err(err)), + } + } + + /// This function is needed to help the borrow checker allow us to return references all the time + /// It contains a bunch of logic shared between peek and `read_line` invocations. + fn read_line_inner_exhaustive<'a>( + reader: &mut T, + buf: &'a mut Vec, + delimiters: &[PacketLineRef<'static>], + fail_on_err_lines: bool, + buf_resize: bool, + trace: bool, + ) -> ExhaustiveOutcome<'a> { + ( + false, + None, + Some(match Self::read_line_inner(reader, buf) { + Ok(Ok(line)) => { + if trace { + match line { + #[allow(unused_variables)] + PacketLineRef::Data(d) => { + gix_trace::trace!("<< {}", d.as_bstr().trim().as_bstr()); + } + PacketLineRef::Flush => { + gix_trace::trace!("<< FLUSH"); + } + PacketLineRef::Delimiter => { + gix_trace::trace!("<< DELIM"); + } + PacketLineRef::ResponseEnd => { + gix_trace::trace!("<< RESPONSE_END"); + } + } + } + if delimiters.contains(&line) { + let stopped_at = delimiters.iter().find(|l| **l == line).copied(); + buf.clear(); + return (true, stopped_at, None); + } else if fail_on_err_lines { + if let Some(err) = line.check_error() { + let err = err.0.as_bstr().to_owned(); + buf.clear(); + return ( + true, + None, + Some(Err(io::Error::new( + io::ErrorKind::Other, + crate::read::Error { message: err }, + ))), + ); + } + } + let len = line.as_slice().map_or(U16_HEX_BYTES, |s| s.len() + U16_HEX_BYTES); + if buf_resize { + buf.resize(len, 0); + } + // TODO(borrowchk): remove additional decoding of internal buffer which is needed only to make it past borrowchk + Ok(Ok(crate::decode(buf).expect("only valid data here"))) + } + Ok(Err(err)) => { + buf.clear(); + Ok(Err(err)) + } + Err(err) => { + buf.clear(); + Err(err) + } + }), + ) + } + + /// Read a packet line into the internal buffer and return it. + /// + /// Returns `None` if the end of iteration is reached because of one of the following: + /// + /// * natural EOF + /// * ERR packet line encountered if [`fail_on_err_lines()`][StreamingPeekableIter::fail_on_err_lines()] is true. + /// * A `delimiter` packet line encountered + pub fn read_line(&mut self) -> Option, decode::Error>>> { + if self.is_done { + return None; + } + if !self.peek_buf.is_empty() { + std::mem::swap(&mut self.peek_buf, &mut self.buf); + self.peek_buf.clear(); + Some(Ok(Ok(crate::decode(&self.buf).expect("only valid data in peek buf")))) + } else { + if self.buf.len() != MAX_LINE_LEN { + self.buf.resize(MAX_LINE_LEN, 0); + } + let (is_done, stopped_at, res) = Self::read_line_inner_exhaustive( + &mut self.read, + &mut self.buf, + self.delimiters, + self.fail_on_err_lines, + false, + self.trace, + ); + self.is_done = is_done; + self.stopped_at = stopped_at; + res + } + } + + /// Peek the next packet line without consuming it. Returns `None` if a stop-packet or an error + /// was encountered. + /// + /// Multiple calls to peek will return the same packet line, if there is one. + pub fn peek_line(&mut self) -> Option, decode::Error>>> { + if self.is_done { + return None; + } + if self.peek_buf.is_empty() { + self.peek_buf.resize(MAX_LINE_LEN, 0); + let (is_done, stopped_at, res) = Self::read_line_inner_exhaustive( + &mut self.read, + &mut self.peek_buf, + self.delimiters, + self.fail_on_err_lines, + true, + self.trace, + ); + self.is_done = is_done; + self.stopped_at = stopped_at; + res + } else { + Some(Ok(Ok(crate::decode(&self.peek_buf).expect("only valid data here")))) + } + } + + /// Return this instance as implementor of [`Read`][io::Read] assuming side bands to be used in all received packet lines. + /// Each invocation of [`read_line()`][io::BufRead::read_line()] returns a packet line. + /// + /// Progress or error information will be passed to the given `handle_progress(is_error, text)` function, with `is_error: bool` + /// being true in case the `text` is to be interpreted as error. + /// + /// _Please note_ that side bands need to be negotiated with the server. + pub fn as_read_with_sidebands ProgressAction>( + &mut self, + handle_progress: F, + ) -> WithSidebands<'_, T, F> { + WithSidebands::with_progress_handler(self, handle_progress) + } + + /// Same as [`as_read_with_sidebands(…)`][StreamingPeekableIter::as_read_with_sidebands()], but for channels without side band support. + /// + /// The type parameter `F` needs to be configured for this method to be callable using the 'turbofish' operator. + /// Use [`as_read()`][StreamingPeekableIter::as_read()]. + pub fn as_read_without_sidebands ProgressAction>(&mut self) -> WithSidebands<'_, T, F> { + WithSidebands::without_progress_handler(self) + } + + /// Same as [`as_read_with_sidebands(…)`][StreamingPeekableIter::as_read_with_sidebands()], but for channels without side band support. + /// + /// Due to the preconfigured function type this method can be called without 'turbofish'. + #[allow(clippy::type_complexity)] + pub fn as_read(&mut self) -> WithSidebands<'_, T, fn(bool, &[u8]) -> ProgressAction> { + WithSidebands::new(self) + } +} diff --git a/gix-packetline-blocking/src/read/mod.rs b/gix-packetline-blocking/src/read/mod.rs new file mode 100644 index 00000000000..386fa492b80 --- /dev/null +++ b/gix-packetline-blocking/src/read/mod.rs @@ -0,0 +1,130 @@ +// DO NOT EDIT - this is a copy of gix-packetline/src/read/mod.rs. Run `just copy-packetline` to update it. + +#[cfg(any(feature = "blocking-io", feature = "async-io"))] +use crate::MAX_LINE_LEN; +use crate::{PacketLineRef, StreamingPeekableIter, U16_HEX_BYTES}; + +/// Allow the read-progress handler to determine how to continue. +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub enum ProgressAction { + /// Continue reading the next progress if available. + Continue, + /// Abort all IO even if more would be available, claiming the operation was interrupted. + Interrupt, +} + +#[cfg(any(feature = "blocking-io", feature = "async-io"))] +type ExhaustiveOutcome<'a> = ( + bool, // is_done + Option>, // stopped_at + Option, crate::decode::Error>>>, // actual method result +); + +mod error { + use std::fmt::{Debug, Display, Formatter}; + + use bstr::BString; + + /// The error representing an ERR packet line, as possibly wrapped into an `std::io::Error` in + /// [`read_line(…)`][super::StreamingPeekableIter::read_line()]. + #[derive(Debug)] + pub struct Error { + /// The contents of the ERR line, with `ERR` portion stripped. + pub message: BString, + } + + impl Display for Error { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + Display::fmt(&self.message, f) + } + } + + impl std::error::Error for Error {} +} +pub use error::Error; + +impl StreamingPeekableIter { + /// Return a new instance from `read` which will stop decoding packet lines when receiving one of the given `delimiters`. + /// If `trace` is `true`, all packetlines received or sent will be passed to the facilities of the `gix-trace` crate. + pub fn new(read: T, delimiters: &'static [PacketLineRef<'static>], trace: bool) -> Self { + StreamingPeekableIter { + read, + #[cfg(any(feature = "blocking-io", feature = "async-io"))] + buf: vec![0; MAX_LINE_LEN], + peek_buf: Vec::new(), + delimiters, + fail_on_err_lines: false, + is_done: false, + stopped_at: None, + trace, + } + } + + /// Modify the peek buffer, overwriting the byte at `position` with the given byte to `replace_with` while truncating + /// it to contain only bytes until the newly replaced `position`. + /// + /// This is useful if you would want to remove 'special bytes' hidden behind, say a NULL byte to disappear and allow + /// standard line readers to read the next line as usual. + /// + /// **Note** that `position` does not include the 4 bytes prefix (they are invisible outside the reader) + pub fn peek_buffer_replace_and_truncate(&mut self, position: usize, replace_with: u8) { + let position = position + U16_HEX_BYTES; + self.peek_buf[position] = replace_with; + + let new_len = position + 1; + self.peek_buf.truncate(new_len); + self.peek_buf[..4].copy_from_slice(&crate::encode::u16_to_hex((new_len) as u16)); + } + + /// Returns the packet line that stopped the iteration, or + /// `None` if the end wasn't reached yet, on EOF, or if [`fail_on_err_lines()`][StreamingPeekableIter::fail_on_err_lines()] was true. + pub fn stopped_at(&self) -> Option> { + self.stopped_at + } + + /// Reset all iteration state allowing to continue a stopped iteration that is not yet at EOF. + /// + /// This can happen once a delimiter is reached. + pub fn reset(&mut self) { + let delimiters = std::mem::take(&mut self.delimiters); + self.reset_with(delimiters); + } + + /// Similar to [`reset()`][StreamingPeekableIter::reset()] with support to changing the `delimiters`. + pub fn reset_with(&mut self, delimiters: &'static [PacketLineRef<'static>]) { + self.delimiters = delimiters; + self.is_done = false; + self.stopped_at = None; + } + + /// If `value` is `true` the provider will check for special `ERR` packet lines and stop iteration when one is encountered. + /// + /// Use [`stopped_at()]`[`StreamingPeekableIter::stopped_at()`] to inspect the cause of the end of the iteration. + /// ne + pub fn fail_on_err_lines(&mut self, value: bool) { + self.fail_on_err_lines = value; + } + + /// Replace the reader used with the given `read`, resetting all other iteration state as well. + pub fn replace(&mut self, read: T) -> T { + let prev = std::mem::replace(&mut self.read, read); + self.reset(); + self.fail_on_err_lines = false; + prev + } + + /// Return the inner read + pub fn into_inner(self) -> T { + self.read + } +} + +#[cfg(feature = "blocking-io")] +mod blocking_io; + +#[cfg(all(not(feature = "blocking-io"), feature = "async-io"))] +mod async_io; + +mod sidebands; +#[cfg(any(feature = "blocking-io", feature = "async-io"))] +pub use sidebands::WithSidebands; diff --git a/gix-packetline-blocking/src/read/sidebands/async_io.rs b/gix-packetline-blocking/src/read/sidebands/async_io.rs new file mode 100644 index 00000000000..0582ea11ed8 --- /dev/null +++ b/gix-packetline-blocking/src/read/sidebands/async_io.rs @@ -0,0 +1,383 @@ +// DO NOT EDIT - this is a copy of gix-packetline/src/read/sidebands/async_io.rs. Run `just copy-packetline` to update it. + +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use futures_io::{AsyncBufRead, AsyncRead}; +use futures_lite::ready; + +use crate::{decode, read::ProgressAction, BandRef, PacketLineRef, StreamingPeekableIter, TextRef, U16_HEX_BYTES}; + +type ReadLineResult<'a> = Option, decode::Error>>>; +/// An implementor of [`AsyncBufRead`] yielding packet lines on each call to [`read_line()`][AsyncBufRead::read_line()]. +/// It's also possible to hide the underlying packet lines using the [`Read`][AsyncRead] implementation which is useful +/// if they represent binary data, like the one of a pack file. +pub struct WithSidebands<'a, T, F> +where + T: AsyncRead, +{ + state: State<'a, T>, + handle_progress: Option, + pos: usize, + cap: usize, +} + +impl<'a, T, F> Drop for WithSidebands<'a, T, F> +where + T: AsyncRead, +{ + fn drop(&mut self) { + if let State::Idle { ref mut parent } = self.state { + parent + .as_mut() + .expect("parent is always available if we are idle") + .reset(); + } + } +} + +impl<'a, T> WithSidebands<'a, T, fn(bool, &[u8]) -> ProgressAction> +where + T: AsyncRead, +{ + /// Create a new instance with the given provider as `parent`. + pub fn new(parent: &'a mut StreamingPeekableIter) -> Self { + WithSidebands { + state: State::Idle { parent: Some(parent) }, + handle_progress: None, + pos: 0, + cap: 0, + } + } +} + +enum State<'a, T> { + Idle { + parent: Option<&'a mut StreamingPeekableIter>, + }, + ReadLine { + read_line: Pin> + 'a>>, + parent_inactive: Option<*mut StreamingPeekableIter>, + }, +} + +/// # SAFETY +/// It's safe because T is `Send` and we have a test that assures that our `StreamingPeekableIter` is `Send` as well, +/// hence the `*mut _` is `Send`. +/// `read_line` isn't send and we can't declare it as such as it forces `Send` in all places (BUT WHY IS THAT A PROBLEM, I don't recall). +/// However, it's only used when pinned and thus isn't actually sent anywhere, it's a secondary state of the future used after it was Send +/// to a thread possibly. +// TODO: Is it possible to declare it as it should be? +#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)] +unsafe impl<'a, T> Send for State<'a, T> where T: Send {} + +impl<'a, T, F> WithSidebands<'a, T, F> +where + T: AsyncRead + Unpin, + F: FnMut(bool, &[u8]) -> ProgressAction + Unpin, +{ + /// Create a new instance with the given `parent` provider and the `handle_progress` function. + /// + /// Progress or error information will be passed to the given `handle_progress(is_error, text)` function, with `is_error: bool` + /// being true in case the `text` is to be interpreted as error. + pub fn with_progress_handler(parent: &'a mut StreamingPeekableIter, handle_progress: F) -> Self { + WithSidebands { + state: State::Idle { parent: Some(parent) }, + handle_progress: Some(handle_progress), + pos: 0, + cap: 0, + } + } + + /// Create a new instance without a progress handler. + pub fn without_progress_handler(parent: &'a mut StreamingPeekableIter) -> Self { + WithSidebands { + state: State::Idle { parent: Some(parent) }, + handle_progress: None, + pos: 0, + cap: 0, + } + } + + /// Forwards to the parent [`StreamingPeekableIter::reset_with()`] + pub fn reset_with(&mut self, delimiters: &'static [PacketLineRef<'static>]) { + if let State::Idle { ref mut parent } = self.state { + parent + .as_mut() + .expect("parent is always available if we are idle") + .reset_with(delimiters) + } + } + + /// Forwards to the parent [`StreamingPeekableIter::stopped_at()`] + pub fn stopped_at(&self) -> Option> { + match self.state { + State::Idle { ref parent } => { + parent + .as_ref() + .expect("parent is always available if we are idle") + .stopped_at + } + _ => None, + } + } + + /// Set or unset the progress handler. + pub fn set_progress_handler(&mut self, handle_progress: Option) { + self.handle_progress = handle_progress; + } + + /// Effectively forwards to the parent [`StreamingPeekableIter::peek_line()`], allowing to see what would be returned + /// next on a call to [`read_line()`][io::BufRead::read_line()]. + /// + /// # Warning + /// + /// This skips all sideband handling and may return an unprocessed line with sidebands still contained in it. + pub async fn peek_data_line(&mut self) -> Option>> { + match self.state { + State::Idle { ref mut parent } => match parent + .as_mut() + .expect("parent is always available if we are idle") + .peek_line() + .await + { + Some(Ok(Ok(PacketLineRef::Data(line)))) => Some(Ok(Ok(line))), + Some(Ok(Err(err))) => Some(Ok(Err(err))), + Some(Err(err)) => Some(Err(err)), + _ => None, + }, + _ => None, + } + } + + /// Read a packet line as string line. + pub fn read_line_to_string<'b>(&'b mut self, buf: &'b mut String) -> ReadLineFuture<'a, 'b, T, F> { + ReadLineFuture { parent: self, buf } + } + + /// Read a packet line from the underlying packet reader, returning empty lines if a stop-packetline was reached. + /// + /// # Warning + /// + /// This skips all sideband handling and may return an unprocessed line with sidebands still contained in it. + pub async fn read_data_line(&mut self) -> Option, decode::Error>>> { + match &mut self.state { + State::Idle { parent: Some(parent) } => { + assert_eq!( + self.cap, 0, + "we don't support partial buffers right now - read-line must be used consistently" + ); + parent.read_line().await + } + _ => None, + } + } +} + +pub struct ReadDataLineFuture<'a, 'b, T: AsyncRead, F> { + parent: &'b mut WithSidebands<'a, T, F>, + buf: &'b mut Vec, +} + +impl<'a, 'b, T, F> Future for ReadDataLineFuture<'a, 'b, T, F> +where + T: AsyncRead + Unpin, + F: FnMut(bool, &[u8]) -> ProgressAction + Unpin, +{ + type Output = std::io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + assert_eq!( + self.parent.cap, 0, + "we don't support partial buffers right now - read-line must be used consistently" + ); + let Self { buf, parent } = &mut *self; + let line = ready!(Pin::new(parent).poll_fill_buf(cx))?; + buf.clear(); + buf.extend_from_slice(line); + let bytes = line.len(); + self.parent.cap = 0; + Poll::Ready(Ok(bytes)) + } +} + +pub struct ReadLineFuture<'a, 'b, T: AsyncRead, F> { + parent: &'b mut WithSidebands<'a, T, F>, + buf: &'b mut String, +} + +impl<'a, 'b, T, F> Future for ReadLineFuture<'a, 'b, T, F> +where + T: AsyncRead + Unpin, + F: FnMut(bool, &[u8]) -> ProgressAction + Unpin, +{ + type Output = std::io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + assert_eq!( + self.parent.cap, 0, + "we don't support partial buffers right now - read-line must be used consistently" + ); + let Self { buf, parent } = &mut *self; + let line = std::str::from_utf8(ready!(Pin::new(parent).poll_fill_buf(cx))?) + .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?; + buf.clear(); + buf.push_str(line); + let bytes = line.len(); + self.parent.cap = 0; + Poll::Ready(Ok(bytes)) + } +} + +impl<'a, T, F> AsyncBufRead for WithSidebands<'a, T, F> +where + T: AsyncRead + Unpin, + F: FnMut(bool, &[u8]) -> ProgressAction + Unpin, +{ + fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + use std::io; + + use futures_lite::FutureExt; + { + let this = self.as_mut().get_mut(); + if this.pos >= this.cap { + let (ofs, cap) = loop { + match this.state { + State::Idle { ref mut parent } => { + let parent = parent.take().expect("parent to be present here"); + let inactive = parent as *mut _; + this.state = State::ReadLine { + read_line: parent.read_line().boxed_local(), + parent_inactive: Some(inactive), + } + } + State::ReadLine { + ref mut read_line, + ref mut parent_inactive, + } => { + let line = ready!(read_line.poll(cx)); + + this.state = { + let parent = parent_inactive.take().expect("parent pointer always set"); + // SAFETY: It's safe to recover the original mutable reference (from which + // the `read_line` future was created as the latter isn't accessible anymore + // once the state is set to Idle. In other words, either one or the other are + // accessible, never both at the same time. + // Also: We keep a pointer around which is protected by borrowcheck since it's created + // from a legal mutable reference which is moved into the read_line future - if it was manually + // implemented we would be able to re-obtain it from there. + #[allow(unsafe_code)] + let parent = unsafe { &mut *parent }; + State::Idle { parent: Some(parent) } + }; + + let line = match line { + Some(line) => line?.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?, + None => break (0, 0), + }; + + match this.handle_progress.as_mut() { + Some(handle_progress) => { + let band = line + .decode_band() + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + const ENCODED_BAND: usize = 1; + match band { + BandRef::Data(d) => { + if d.is_empty() { + continue; + } + break (U16_HEX_BYTES + ENCODED_BAND, d.len()); + } + BandRef::Progress(d) => { + let text = TextRef::from(d).0; + match handle_progress(false, text) { + ProgressAction::Continue => {} + ProgressAction::Interrupt => { + return Poll::Ready(Err(io::Error::new( + std::io::ErrorKind::Other, + "interrupted by user", + ))) + } + }; + } + BandRef::Error(d) => { + let text = TextRef::from(d).0; + match handle_progress(true, text) { + ProgressAction::Continue => {} + ProgressAction::Interrupt => { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::Other, + "interrupted by user", + ))) + } + }; + } + }; + } + None => { + break match line.as_slice() { + Some(d) => (U16_HEX_BYTES, d.len()), + None => { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "encountered non-data line in a data-line only context", + ))) + } + } + } + } + } + } + }; + this.cap = cap + ofs; + this.pos = ofs; + } + } + let range = self.pos..self.cap; + match &self.get_mut().state { + State::Idle { parent } => Poll::Ready(Ok(&parent.as_ref().expect("parent always available").buf[range])), + State::ReadLine { .. } => unreachable!("at least in theory"), + } + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + let this = self.get_mut(); + this.pos = std::cmp::min(this.pos + amt, this.cap); + } +} + +impl<'a, T, F> AsyncRead for WithSidebands<'a, T, F> +where + T: AsyncRead + Unpin, + F: FnMut(bool, &[u8]) -> ProgressAction + Unpin, +{ + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + use std::io::Read; + let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?; + let nread = rem.read(buf)?; + self.consume(nread); + Poll::Ready(Ok(nread)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + fn receiver(_i: T) {} + + /// We want to declare items containing pointers of `StreamingPeekableIter` `Send` as well, so it must be `Send` itself. + #[test] + fn streaming_peekable_iter_is_send() { + receiver(StreamingPeekableIter::new(Vec::::new(), &[], false)); + } + + #[test] + fn state_is_send() { + let mut s = StreamingPeekableIter::new(Vec::::new(), &[], false); + receiver(State::Idle { parent: Some(&mut s) }); + } +} diff --git a/gix-packetline-blocking/src/read/sidebands/blocking_io.rs b/gix-packetline-blocking/src/read/sidebands/blocking_io.rs new file mode 100644 index 00000000000..c4dabb093a1 --- /dev/null +++ b/gix-packetline-blocking/src/read/sidebands/blocking_io.rs @@ -0,0 +1,218 @@ +// DO NOT EDIT - this is a copy of gix-packetline/src/read/sidebands/blocking_io.rs. Run `just copy-packetline` to update it. + +use std::{io, io::BufRead}; + +use crate::{read::ProgressAction, BandRef, PacketLineRef, StreamingPeekableIter, TextRef, U16_HEX_BYTES}; + +/// An implementor of [`BufRead`][io::BufRead] yielding packet lines on each call to [`read_line()`][io::BufRead::read_line()]. +/// It's also possible to hide the underlying packet lines using the [`Read`][io::Read] implementation which is useful +/// if they represent binary data, like the one of a pack file. +pub struct WithSidebands<'a, T, F> +where + T: io::Read, +{ + parent: &'a mut StreamingPeekableIter, + handle_progress: Option, + pos: usize, + cap: usize, +} + +impl<'a, T, F> Drop for WithSidebands<'a, T, F> +where + T: io::Read, +{ + fn drop(&mut self) { + self.parent.reset(); + } +} + +impl<'a, T> WithSidebands<'a, T, fn(bool, &[u8]) -> ProgressAction> +where + T: io::Read, +{ + /// Create a new instance with the given provider as `parent`. + pub fn new(parent: &'a mut StreamingPeekableIter) -> Self { + WithSidebands { + parent, + handle_progress: None, + pos: 0, + cap: 0, + } + } +} + +impl<'a, T, F> WithSidebands<'a, T, F> +where + T: io::Read, + F: FnMut(bool, &[u8]) -> ProgressAction, +{ + /// Create a new instance with the given `parent` provider and the `handle_progress` function. + /// + /// Progress or error information will be passed to the given `handle_progress(is_error, text)` function, with `is_error: bool` + /// being true in case the `text` is to be interpreted as error. + pub fn with_progress_handler(parent: &'a mut StreamingPeekableIter, handle_progress: F) -> Self { + WithSidebands { + parent, + handle_progress: Some(handle_progress), + pos: 0, + cap: 0, + } + } + + /// Create a new instance without a progress handler. + pub fn without_progress_handler(parent: &'a mut StreamingPeekableIter) -> Self { + WithSidebands { + parent, + handle_progress: None, + pos: 0, + cap: 0, + } + } + + /// Forwards to the parent [`StreamingPeekableIter::reset_with()`] + pub fn reset_with(&mut self, delimiters: &'static [PacketLineRef<'static>]) { + self.parent.reset_with(delimiters) + } + + /// Forwards to the parent [`StreamingPeekableIter::stopped_at()`] + pub fn stopped_at(&self) -> Option> { + self.parent.stopped_at + } + + /// Set or unset the progress handler. + pub fn set_progress_handler(&mut self, handle_progress: Option) { + self.handle_progress = handle_progress; + } + + /// Effectively forwards to the parent [`StreamingPeekableIter::peek_line()`], allowing to see what would be returned + /// next on a call to [`read_line()`][io::BufRead::read_line()]. + /// + /// # Warning + /// + /// This skips all sideband handling and may return an unprocessed line with sidebands still contained in it. + pub fn peek_data_line(&mut self) -> Option>> { + match self.parent.peek_line() { + Some(Ok(Ok(PacketLineRef::Data(line)))) => Some(Ok(Ok(line))), + Some(Ok(Err(err))) => Some(Ok(Err(err))), + Some(Err(err)) => Some(Err(err)), + _ => None, + } + } + + /// Read a whole packetline from the underlying reader, with empty lines indicating a stop packetline. + /// + /// # Warning + /// + /// This skips all sideband handling and may return an unprocessed line with sidebands still contained in it. + pub fn read_data_line(&mut self) -> Option, crate::decode::Error>>> { + assert_eq!( + self.cap, 0, + "we don't support partial buffers right now - read-line must be used consistently" + ); + self.parent.read_line() + } + + /// Like `BufRead::read_line()`, but will only read one packetline at a time. + /// + /// It will also be easier to call as sometimes it's unclear which implementation we get on a type like this with + /// plenty of generic parameters. + pub fn read_line_to_string(&mut self, buf: &mut String) -> io::Result { + assert_eq!( + self.cap, 0, + "we don't support partial buffers right now - read-line must be used consistently" + ); + let line = std::str::from_utf8(self.fill_buf()?).map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + buf.push_str(line); + let bytes = line.len(); + self.cap = 0; + Ok(bytes) + } +} + +impl<'a, T, F> BufRead for WithSidebands<'a, T, F> +where + T: io::Read, + F: FnMut(bool, &[u8]) -> ProgressAction, +{ + fn fill_buf(&mut self) -> io::Result<&[u8]> { + if self.pos >= self.cap { + let (ofs, cap) = loop { + let line = match self.parent.read_line() { + Some(line) => line?.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?, + None => break (0, 0), + }; + match self.handle_progress.as_mut() { + Some(handle_progress) => { + let band = line + .decode_band() + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + const ENCODED_BAND: usize = 1; + match band { + BandRef::Data(d) => { + if d.is_empty() { + continue; + } + break (U16_HEX_BYTES + ENCODED_BAND, d.len()); + } + BandRef::Progress(d) => { + let text = TextRef::from(d).0; + match handle_progress(false, text) { + ProgressAction::Continue => {} + ProgressAction::Interrupt => { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "interrupted by user", + )) + } + }; + } + BandRef::Error(d) => { + let text = TextRef::from(d).0; + match handle_progress(true, text) { + ProgressAction::Continue => {} + ProgressAction::Interrupt => { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "interrupted by user", + )) + } + }; + } + }; + } + None => { + break match line.as_slice() { + Some(d) => (U16_HEX_BYTES, d.len()), + None => { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "encountered non-data line in a data-line only context", + )) + } + } + } + } + }; + self.cap = cap + ofs; + self.pos = ofs; + } + Ok(&self.parent.buf[self.pos..self.cap]) + } + + fn consume(&mut self, amt: usize) { + self.pos = std::cmp::min(self.pos + amt, self.cap); + } +} + +impl<'a, T, F> io::Read for WithSidebands<'a, T, F> +where + T: io::Read, + F: FnMut(bool, &[u8]) -> ProgressAction, +{ + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let mut rem = self.fill_buf()?; + let nread = rem.read(buf)?; + self.consume(nread); + Ok(nread) + } +} diff --git a/gix-packetline-blocking/src/read/sidebands/mod.rs b/gix-packetline-blocking/src/read/sidebands/mod.rs new file mode 100644 index 00000000000..d9ff58f7809 --- /dev/null +++ b/gix-packetline-blocking/src/read/sidebands/mod.rs @@ -0,0 +1,11 @@ +// DO NOT EDIT - this is a copy of gix-packetline/src/read/sidebands/mod.rs. Run `just copy-packetline` to update it. + +#[cfg(feature = "blocking-io")] +mod blocking_io; +#[cfg(feature = "blocking-io")] +pub use blocking_io::WithSidebands; + +#[cfg(all(not(feature = "blocking-io"), feature = "async-io"))] +mod async_io; +#[cfg(all(not(feature = "blocking-io"), feature = "async-io"))] +pub use async_io::WithSidebands; diff --git a/gix-packetline-blocking/src/write/async_io.rs b/gix-packetline-blocking/src/write/async_io.rs new file mode 100644 index 00000000000..7b76c011fa9 --- /dev/null +++ b/gix-packetline-blocking/src/write/async_io.rs @@ -0,0 +1,99 @@ +// DO NOT EDIT - this is a copy of gix-packetline/src/write/async_io.rs. Run `just copy-packetline` to update it. + +use std::{ + io, + pin::Pin, + task::{Context, Poll}, +}; + +use futures_io::AsyncWrite; + +use crate::{encode, MAX_DATA_LEN, U16_HEX_BYTES}; + +pin_project_lite::pin_project! { + /// An implementor of [`Write`][io::Write] which passes all input to an inner `Write` in packet line data encoding, + /// one line per `write(…)` call or as many lines as it takes if the data doesn't fit into the maximum allowed line length. + pub struct Writer { + #[pin] + inner: encode::LineWriter<'static, T>, + state: State, + } +} + +enum State { + Idle, + WriteData(usize), +} + +impl Writer { + /// Create a new instance from the given `write` + pub fn new(write: T) -> Self { + Writer { + inner: encode::LineWriter::new(write, &[], &[]), + state: State::Idle, + } + } + + /// Return the inner writer, consuming self. + pub fn into_inner(self) -> T { + self.inner.into_inner() + } + + /// Return a mutable reference to the inner writer, useful if packet lines should be serialized directly. + pub fn inner_mut(&mut self) -> &mut T { + &mut self.inner.writer + } +} + +/// Non-IO methods +impl Writer { + /// If called, each call to [`write()`][io::Write::write()] will write bytes as is. + pub fn enable_binary_mode(&mut self) { + self.inner.suffix = &[]; + } + /// If called, each call to [`write()`][io::Write::write()] will write the input as text, appending a trailing newline + /// if needed before writing. + pub fn enable_text_mode(&mut self) { + self.inner.suffix = &[b'\n']; + } +} + +impl AsyncWrite for Writer { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + let mut this = self.project(); + loop { + match this.state { + State::Idle => { + if buf.is_empty() { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::Other, + "empty packet lines are not permitted as '0004' is invalid", + ))); + } + *this.state = State::WriteData(0) + } + State::WriteData(written) => { + while *written != buf.len() { + let data = &buf[*written..*written + (buf.len() - *written).min(MAX_DATA_LEN)]; + let n = futures_lite::ready!(this.inner.as_mut().poll_write(cx, data))?; + if n == 0 { + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); + } + *written += n; + *written -= U16_HEX_BYTES + this.inner.suffix.len(); + } + *this.state = State::Idle; + return Poll::Ready(Ok(buf.len())); + } + } + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_close(cx) + } +} diff --git a/gix-packetline-blocking/src/write/blocking_io.rs b/gix-packetline-blocking/src/write/blocking_io.rs new file mode 100644 index 00000000000..6a302ea6de7 --- /dev/null +++ b/gix-packetline-blocking/src/write/blocking_io.rs @@ -0,0 +1,73 @@ +// DO NOT EDIT - this is a copy of gix-packetline/src/write/blocking_io.rs. Run `just copy-packetline` to update it. + +use std::io; + +use crate::{MAX_DATA_LEN, U16_HEX_BYTES}; + +/// An implementor of [`Write`][io::Write] which passes all input to an inner `Write` in packet line data encoding, +/// one line per `write(…)` call or as many lines as it takes if the data doesn't fit into the maximum allowed line length. +pub struct Writer { + /// the `Write` implementation to which to propagate packet lines + inner: T, + binary: bool, +} + +impl Writer { + /// Create a new instance from the given `write` + pub fn new(write: T) -> Self { + Writer { + inner: write, + binary: true, + } + } +} + +/// Non-IO methods +impl Writer { + /// If called, each call to [`write()`][io::Write::write()] will write bytes as is. + pub fn enable_binary_mode(&mut self) { + self.binary = true; + } + /// If called, each call to [`write()`][io::Write::write()] will write the input as text, appending a trailing newline + /// if needed before writing. + pub fn enable_text_mode(&mut self) { + self.binary = false; + } + /// Return the inner writer, consuming self. + pub fn into_inner(self) -> T { + self.inner + } + /// Return a mutable reference to the inner writer, useful if packet lines should be serialized directly. + pub fn inner_mut(&mut self) -> &mut T { + &mut self.inner + } +} + +impl io::Write for Writer { + fn write(&mut self, mut buf: &[u8]) -> io::Result { + if buf.is_empty() { + return Err(io::Error::new( + io::ErrorKind::Other, + "empty packet lines are not permitted as '0004' is invalid", + )); + } + + let mut written = 0; + while !buf.is_empty() { + let (data, rest) = buf.split_at(buf.len().min(MAX_DATA_LEN)); + written += if self.binary { + crate::encode::data_to_write(data, &mut self.inner) + } else { + crate::encode::text_to_write(data, &mut self.inner) + }?; + // subtract header (and trailing NL) because write-all can't handle writing more than it passes in + written -= U16_HEX_BYTES + usize::from(!self.binary); + buf = rest; + } + Ok(written) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner.flush() + } +} diff --git a/gix-packetline-blocking/src/write/mod.rs b/gix-packetline-blocking/src/write/mod.rs new file mode 100644 index 00000000000..73585e91f47 --- /dev/null +++ b/gix-packetline-blocking/src/write/mod.rs @@ -0,0 +1,23 @@ +// DO NOT EDIT - this is a copy of gix-packetline/src/write/mod.rs. Run `just copy-packetline` to update it. + +use crate::Writer; + +#[cfg(all(not(feature = "blocking-io"), feature = "async-io"))] +pub(crate) mod async_io; + +#[cfg(feature = "blocking-io")] +pub(crate) mod blocking_io; + +/// Common methods +impl Writer { + /// As [`enable_text_mode()`][Writer::enable_text_mode()], but suitable for chaining. + pub fn text_mode(mut self) -> Self { + self.enable_text_mode(); + self + } + /// As [`enable_binary_mode()`][Writer::enable_binary_mode()], but suitable for chaining. + pub fn binary_mode(mut self) -> Self { + self.enable_binary_mode(); + self + } +} diff --git a/justfile b/justfile index e8890319849..8979a7777fd 100755 --- a/justfile +++ b/justfile @@ -252,3 +252,7 @@ fmt: # Cancel this after the first few seconds, as yanked crates will appear in warnings. find-yanked: cargo install --debug --locked --no-default-features --features max-pure --path . + +# Delete gix-packetline-blocking/src and regenerate from gix-packetline/src +copy-packetline: + ./etc/copy-packetline.sh