Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(deps): Bump to Rust 1.72.0 #18389

Merged
merged 13 commits into from
Sep 1, 2023
2 changes: 1 addition & 1 deletion Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ load('ext://helm_resource', 'helm_resource', 'helm_repo')
docker_build(
ref='timberio/vector',
context='.',
build_args={'RUST_VERSION': '1.71.1'},
build_args={'RUST_VERSION': '1.72.0'},
dockerfile='tilt/Dockerfile'
)

Expand Down
8 changes: 4 additions & 4 deletions lib/prometheus-parser/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ mod test {

#[test]
fn test_parse_text() {
let input = r##"
let input = r#"
# HELP http_requests_total The total number of HTTP requests.
# TYPE http_requests_total counter
http_requests_total{method="post",code="200"} 1027 1395066363000
Expand Down Expand Up @@ -512,7 +512,7 @@ mod test {
rpc_duration_seconds{quantile="0.99"} 76656
rpc_duration_seconds_sum 1.7560473e+07
rpc_duration_seconds_count 4.588206224e+09
"##;
"#;
let output = parse_text(input).unwrap();
assert_eq!(output.len(), 7);
match_group!(output[0], "http_requests_total", Counter => |metrics: &MetricMap<SimpleMetric>| {
Expand Down Expand Up @@ -651,7 +651,7 @@ mod test {

#[test]
fn test_errors() {
let input = r##"name{registry="default" content_type="html"} 1890"##;
let input = r#"name{registry="default" content_type="html"} 1890"#;
let error = parse_text(input).unwrap_err();
assert!(matches!(
error,
Expand Down Expand Up @@ -681,7 +681,7 @@ mod test {
}
));

let input = r##"name{registry="} 1890"##;
let input = r#"name{registry="} 1890"#;
let error = parse_text(input).unwrap_err();
assert!(matches!(
error,
Expand Down
8 changes: 4 additions & 4 deletions lib/prometheus-parser/src/line.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ mod test {
assert_eq!(left, tail);
assert_eq!(r, "");

let input = wrap(r#"a\\ asdf"#);
let input = wrap(r"a\\ asdf");
let (left, r) = Metric::parse_escaped_string(&input).unwrap();
assert_eq!(left, tail);
assert_eq!(r, "a\\ asdf");
Expand All @@ -427,7 +427,7 @@ mod test {
assert_eq!(left, tail);
assert_eq!(r, "\"\\\n");

let input = wrap(r#"\\n"#);
let input = wrap(r"\\n");
let (left, r) = Metric::parse_escaped_string(&input).unwrap();
assert_eq!(left, tail);
assert_eq!(r, "\\n");
Expand Down Expand Up @@ -671,7 +671,7 @@ mod test {

#[test]
fn test_parse_line() {
let input = r##"
let input = r#"
# HELP http_requests_total The total number of HTTP requests.
# TYPE http_requests_total counter
http_requests_total{method="post",code="200"} 1027 1395066363000
Expand Down Expand Up @@ -708,7 +708,7 @@ mod test {
rpc_duration_seconds{quantile="0.99"} 76656
rpc_duration_seconds_sum 1.7560473e+07
rpc_duration_seconds_count 2693
"##;
"#;
assert!(input.lines().map(Line::parse).all(|r| r.is_ok()));
}
}
5 changes: 5 additions & 0 deletions lib/vector-buffers/src/test/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ macro_rules! await_timeout {
}};
}

/// Run a future with a temporary directory.
///
/// # Panics
///
/// Will panic if function cannot create a temp directory.
pub async fn with_temp_dir<F, Fut, V>(f: F) -> V
where
F: FnOnce(&Path) -> Fut,
Expand Down
1 change: 1 addition & 0 deletions lib/vector-buffers/src/test/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ macro_rules! message_wrapper {
}

impl EventCount for $id {
#[allow(clippy::redundant_closure_call)]
fn event_count(&self) -> usize {
usize::try_from($event_count(self)).unwrap_or(usize::MAX)
}
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-buffers/src/test/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl Variant {
let (sender, receiver) = builder
.build(String::from("benches"), Span::none())
.await
.expect("topology build should not fail");
.unwrap_or_else(|_| unreachable!("topology build should not fail"));

(sender, receiver)
}
Expand Down
16 changes: 13 additions & 3 deletions lib/vector-buffers/src/topology/acks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ where
///
/// Acknowledgements should be given by the caller to update the acknowledgement state before
/// trying to get any eligible markers.
///
/// # Panics
///
/// Will panic if adding ack amount overflows.
pub fn add_acknowledgements(&mut self, amount: N) {
self.unclaimed_acks = self
.unclaimed_acks
Expand Down Expand Up @@ -315,6 +319,10 @@ where
///
/// When other pending markers are present, and the given ID is logically behind the next
/// expected marker ID, `Err(MarkerError::MonotonicityViolation)` is returned.
///
/// # Panics
///
/// Panics if pending markers is empty when last pending marker is an unknown size.
pub fn add_marker(
&mut self,
id: N,
Expand All @@ -341,7 +349,7 @@ where
let last_marker = self
.pending_markers
.back_mut()
.expect("pending markers should not be empty");
.unwrap_or_else(|| unreachable!("pending markers should not be empty"));

last_marker.len = PendingMarkerLength::Assumed(len);
}
Expand Down Expand Up @@ -425,13 +433,15 @@ where
let PendingMarker { id, data, .. } = self
.pending_markers
.pop_front()
.expect("pending markers cannot be empty");
.unwrap_or_else(|| unreachable!("pending markers cannot be empty"));

if acks_to_claim > N::min_value() {
self.unclaimed_acks = self
.unclaimed_acks
.checked_sub(&acks_to_claim)
.expect("should not be able to claim more acks than are unclaimed");
.unwrap_or_else(|| {
unreachable!("should not be able to claim more acks than are unclaimed")
});
}

self.acked_marker_id = id.wrapping_add(&len.len());
Expand Down
6 changes: 4 additions & 2 deletions lib/vector-buffers/src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ impl<T: Bufferable> TopologyBuilder<T> {
/// This is a convenience method for `vector` as it is used for inter-transform channels, and we
/// can simplifying needing to require callers to do all the boilerplate to create the builder,
/// create the stage, installing buffer usage metrics that aren't required, and so on.
///
#[allow(clippy::print_stderr)]
pub async fn standalone_memory(
max_events: NonZeroUsize,
when_full: WhenFull,
Expand All @@ -193,7 +195,7 @@ impl<T: Bufferable> TopologyBuilder<T> {
let (sender, receiver) = memory_buffer
.into_buffer_parts(usage_handle.clone())
.await
.expect("should not fail to directly create a memory buffer");
.unwrap_or_else(|_| unreachable!("should not fail to directly create a memory buffer"));

let mode = match when_full {
WhenFull::Overflow => WhenFull::Block,
Expand Down Expand Up @@ -228,7 +230,7 @@ impl<T: Bufferable> TopologyBuilder<T> {
let (sender, receiver) = memory_buffer
.into_buffer_parts(usage_handle.clone())
.await
.expect("should not fail to directly create a memory buffer");
.unwrap_or_else(|_| unreachable!("should not fail to directly create a memory buffer"));

let mode = match when_full {
WhenFull::Overflow => WhenFull::Block,
Expand Down
13 changes: 9 additions & 4 deletions lib/vector-buffers/src/topology/channel/limited_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,15 @@ impl<T: Bufferable> LimitedSender<T> {
.limiter
.clone()
.acquire_many_owned(permits_required)
.await else {
return Err(SendError(item))
.await
else {
return Err(SendError(item));
};

self.inner
.data
.push((permits, item))
.expect("acquired permits but channel reported being full");
.unwrap_or_else(|_| unreachable!("acquired permits but channel reported being full"));
self.inner.read_waker.notify_one();

trace!("Sent item.");
Expand All @@ -130,6 +131,10 @@ impl<T: Bufferable> LimitedSender<T> {
/// `Err(TrySendError::Disconnected)` be returned with the given `item`. If the channel has
/// insufficient capacity for the item, then `Err(TrySendError::InsufficientCapacity)` will be
/// returned with the given `item`.
///
/// # Panics
///
/// Will panic if adding ack amount overflows.
pub fn try_send(&mut self, item: T) -> Result<(), TrySendError<T>> {
// Calculate how many permits we need, and try to acquire them all without waiting.
let permits_required = self.get_required_permits_for_item(&item);
Expand All @@ -151,7 +156,7 @@ impl<T: Bufferable> LimitedSender<T> {
self.inner
.data
.push((permits, item))
.expect("acquired permits but channel reported being full");
.unwrap_or_else(|_| unreachable!("acquired permits but channel reported being full"));
self.inner.read_waker.notify_one();

trace!("Attempt to send item succeeded.");
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-buffers/src/topology/channel/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl<T: Bufferable> BufferSender<T> {
sent_to_base = false;
self.overflow
.as_mut()
.expect("overflow must exist")
.unwrap_or_else(|| unreachable!("overflow must exist"))
.send(item)
.await?;
}
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-buffers/src/variants/disk_v2/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ where
return Err(BuildError::InvalidParameter {
param_name: "max_record_size",
reason: "must be less than 2^64 bytes".to_string(),
})
});
};

if max_record_size_converted > max_data_file_size {
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-buffers/src/variants/disk_v2/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,6 @@ where
)
.field("writer_done", &self.writer_done.load(Ordering::Acquire))
.field("last_flush", &self.last_flush.load())
.finish()
.finish_non_exhaustive()
}
}
2 changes: 1 addition & 1 deletion lib/vector-buffers/src/variants/disk_v2/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ where
// If there's an error decoding the item, just fall back to the slow path,
// because this file might actually be where we left off, so we don't want
// to incorrectly skip ahead or anything.
break
break;
};

// We have to remove 1 from the event count here because otherwise the ID would
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl fmt::Debug for Record {
.field("event_count", &self.event_count)
.field("encoded_len", &self.encoded_len())
.field("archived_len", &self.archived_len())
.finish()
.finish_non_exhaustive()
}
}

Expand Down
4 changes: 2 additions & 2 deletions lib/vector-common/src/event_test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub fn contains_name_once(pattern: &str) -> Result<(), String> {
EVENTS_RECORDED.with(|events| {
let mut n_events = 0;
let mut names = String::new();
for event in events.borrow().iter() {
for event in &*events.borrow() {
if event.ends_with(pattern) {
if n_events > 0 {
names.push_str(", ");
Expand Down Expand Up @@ -44,7 +44,7 @@ pub fn clear_recorded_events() {
#[allow(clippy::print_stdout)]
pub fn debug_print_events() {
EVENTS_RECORDED.with(|events| {
for event in events.borrow().iter() {
for event in &*events.borrow() {
println!("{event}");
}
});
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-common/src/finalization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl EventFinalizers {

/// Merges the event finalizers from `other` into the collection.
pub fn merge(&mut self, other: Self) {
self.0.extend(other.0.into_iter());
self.0.extend(other.0);
}

/// Updates the status of all event finalizers in the collection.
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-config-common/src/schema/json_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ impl<T: Clone> Extend<T> for SingleOrVec<T> {
fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
match self {
Self::Single(item) => {
*self = Self::Vec(iter::once(*item.clone()).chain(iter.into_iter()).collect());
*self = Self::Vec(iter::once(*item.clone()).chain(iter).collect());
}
Self::Vec(items) => items.extend(iter),
}
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-config-macros/src/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use serde_derive_internals::{ast as serde_ast, attr as serde_attr};

mod container;
mod field;
pub(self) mod util;
mod util;
mod variant;

pub use container::Container;
Expand Down
6 changes: 1 addition & 5 deletions lib/vector-config/src/schema/visitors/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,7 @@ fn merge_schema_instance_type(
source: Option<&SingleOrVec<InstanceType>>,
) {
merge_optional_with(destination, source, |existing, new| {
let mut deduped = existing
.into_iter()
.chain(new.into_iter())
.cloned()
.collect::<Vec<_>>();
let mut deduped = existing.into_iter().chain(new).cloned().collect::<Vec<_>>();
deduped.dedup();

*existing = deduped.into();
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-config/src/schema/visitors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub mod scoped_visit;
mod unevaluated;

#[cfg(test)]
pub(self) mod test;
mod test;

pub use self::human_name::GenerateHumanFriendlyNameVisitor;
pub use self::inline_single::InlineSingleUseReferencesVisitor;
Expand Down
4 changes: 4 additions & 0 deletions lib/vector-core/src/config/log_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ impl LogSchema {
///
/// This should only be used where the result will either be cached,
/// or performance isn't critical, since this requires memory allocation.
///
/// # Panics
///
/// Panics if the path in `self.message_key` is invalid.
pub fn owned_message_path(&self) -> OwnedTargetPath {
self.message_key
.path
Expand Down
4 changes: 2 additions & 2 deletions lib/vector-core/src/event/discriminant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,13 @@ fn hash_f64<H: Hasher>(hasher: &mut H, value: f64) {
}

fn hash_array<H: Hasher>(hasher: &mut H, array: &[Value]) {
for val in array.iter() {
for val in array {
hash_value(hasher, val);
}
}

fn hash_map<H: Hasher>(hasher: &mut H, map: &BTreeMap<String, Value>) {
for (key, val) in map.iter() {
for (key, val) in map {
hasher.write(key.as_bytes());
hash_value(hasher, val);
}
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ impl LogEvent {
for field in fields {
let field_path = event_path!(field.as_ref());
let Some(incoming_val) = incoming.remove(field_path) else {
continue
continue;
};
match self.get_mut(field_path) {
None => {
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-core/src/event/lua/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl<'a> FromLua<'a> for Event {
from: value.type_name(),
to: "Event",
message: Some("Event should be a Lua table".to_string()),
})
});
};
match (table.raw_get("log")?, table.raw_get("metric")?) {
(LuaValue::Table(log), LuaValue::Nil) => {
Expand Down
4 changes: 4 additions & 0 deletions lib/vector-core/src/event/lua/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ pub fn table_is_timestamp(t: &LuaTable<'_>) -> LuaResult<bool> {
/// # Errors
///
/// This function will fail if the table is malformed.
///
/// # Panics
///
/// Panics if the resulting timestamp is invalid.
#[allow(clippy::needless_pass_by_value)] // constrained by mlua types
pub fn table_to_timestamp(t: LuaTable<'_>) -> LuaResult<DateTime<Utc>> {
let year = t.raw_get("year")?;
Expand Down
Loading