Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ hyper-rustls = { version = "0.27", features = ["http2", "logging"] }
backoff = "0.4"
log = "0.4"
time = "0.3"
educe = { "version" = "0.6", default_features=false, features = ["Debug"] }
educe = { "version" = "0.6", default-features=false, features = ["Debug"] }
once_cell = "1"
smallvec = "1"
countme = "2"
Expand Down
2 changes: 1 addition & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ def publishImage = false
pipeline {
agent {
node {
label "rust-x86_64"
label "ec2-fleet"
customWorkspace("/tmp/workspace/${env.BUILD_TAG}")
}
}
Expand Down
2 changes: 1 addition & 1 deletion docker/dispatch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

curpath=$(dirname "$0")

# shellcheck disable=SC2317
# shellcheck disable=SC2317,SC2329
_term() {
docker kill "$child"
status=$(docker inspect "$child" --format='{{.State.ExitCode}}')
Expand Down
11 changes: 6 additions & 5 deletions src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@ impl core::fmt::Debug for IngestBodyBuffer {
.collect::<Result<Vec<u8>, _>>()
.unwrap();
if let Ok(b) = std::str::from_utf8(&buf) {
write!(f, "IngestBodyBuffer: {}", b)
write!(f, "IngestBodyBuffer: {b}")
} else {
write!(f, "IngestBodyBuffer: {:?}", buf)
write!(f, "IngestBodyBuffer: {buf:?}")
}
}
}

impl PartialEq for IngestBodyBuffer {
fn eq(&self, other: &Self) -> bool {
#[allow(clippy::unbuffered_bytes)]
for (a, b) in self.reader().bytes().zip(other.reader().bytes()) {
match (a, b) {
(Ok(a), Ok(b)) => {
Expand Down Expand Up @@ -148,7 +149,7 @@ impl IntoIngestBodyBuffer for IngestBody {
}

#[async_trait]
impl<'a> IntoIngestBodyBuffer for &'a IngestBody {
impl IntoIngestBodyBuffer for &IngestBody {
type Error = serde_json::error::Error;

async fn into(self) -> Result<IngestBodyBuffer, Self::Error> {
Expand Down Expand Up @@ -234,7 +235,7 @@ pub struct Line {
}

#[async_trait]
impl<'a> IngestLineSerialize<String, bytes::Bytes, HashMap<String, String>> for &'a Line {
impl IngestLineSerialize<String, bytes::Bytes, HashMap<String, String>> for &Line {
type Ok = ();

fn has_annotations(&self) -> bool {
Expand Down Expand Up @@ -734,7 +735,7 @@ impl LineMeta for Line {
}
}

impl<'a> LineMeta for &'a Line {
impl LineMeta for &Line {
fn get_annotations(&self) -> Option<&KeyValueMap> {
self.annotations.as_ref()
}
Expand Down
12 changes: 5 additions & 7 deletions src/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use std::sync::Arc;
use std::task::{self, Poll};

use backoff::{backoff::Backoff, exponential::ExponentialBackoff, SystemClock};
use hyper_util::client::legacy::connect::dns as hyper_dns;
use once_cell::sync::Lazy;
use tokio::sync::Mutex;
use tower::Service;
use hickory_resolver::{
config::{ResolverConfig, ResolverOpts},
lookup_ip::LookupIpIntoIter,
system_conf, TokioAsyncResolver,
};
use hyper_util::client::legacy::connect::dns as hyper_dns;
use once_cell::sync::Lazy;
use tokio::sync::Mutex;
use tower::Service;

struct ResolverInner {
resolver: TokioAsyncResolver,
Expand Down Expand Up @@ -47,9 +47,7 @@ impl HickoryDnsResolver {
.lock()
.expect("Failed to lock SYSTEM_CONF")
.as_ref()
.map_err(|e| {
io::Error::new(e.kind(), format!("error reading DNS system conf: {}", e))
})?;
.map_err(|e| io::Error::new(e.kind(), format!("error reading DNS system conf: {e}")))?;

// At this stage, we might not have been called in the context of a
// Tokio Runtime, so we must delay the actual construction of the
Expand Down
14 changes: 7 additions & 7 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ where
{
fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> {
match self {
HttpError::Send(_, ref e) => write!(f, "{}", e),
HttpError::Send(_, ref e) => write!(f, "{e}"),
HttpError::Timeout(_) => write!(f, "request timed out!"),
HttpError::Hyper(ref e) => write!(f, "{}", e),
HttpError::Build(ref e) => write!(f, "{}", e),
HttpError::Utf8(ref e) => write!(f, "{}", e),
HttpError::FromUtf8(ref e) => write!(f, "{}", e),
HttpError::Serialization(ref e) => write!(f, "{}", e),
HttpError::Other(ref e) => write!(f, "{}", e),
HttpError::Hyper(ref e) => write!(f, "{e}"),
HttpError::Build(ref e) => write!(f, "{e}"),
HttpError::Utf8(ref e) => write!(f, "{e}"),
HttpError::FromUtf8(ref e) => write!(f, "{e}"),
HttpError::Serialization(ref e) => write!(f, "{e}"),
HttpError::Other(ref e) => write!(f, "{e}"),
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/segmented_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ impl From<SegmentedPoolBufError> for std::io::Error {
fn from(err: SegmentedPoolBufError) -> std::io::Error {
match err {
SegmentedPoolBufError::Io(e) => e,
e => std::io::Error::new(std::io::ErrorKind::Other, Box::new(e)),
e => std::io::Error::other(Box::new(e)),
}
}
}
Expand Down Expand Up @@ -1056,7 +1056,7 @@ mod test {
let b = Buffer::new(BytesMut::new());
drop(b);
fence(Ordering::SeqCst);
// Ensure we havn't allocated any bufs yet
// Ensure we haven't allocated any bufs yet
let counts = countme::get::<Buffer>();
assert_eq!(counts.live, 0);
}
Expand All @@ -1071,7 +1071,7 @@ mod test {
let pool = buf.pool.clone();

fence(Ordering::SeqCst);
// Ensure we havn't allocated more bufs than necessary
// Ensure we haven't allocated more bufs than necessary
let counts = countme::get::<Buffer>();
assert!(counts.live > 0);
assert!(counts.live <= initial_pool_size / segment_size + 1);
Expand Down