Unverified Commit 7fa4af4a authored by John DiSanti's avatar John DiSanti Committed by GitHub
Browse files

Fix Event Stream timestamp representation (#619)

Also add ability to write headers to a buffer by themselves.

* Fix Event Stream timestamp representation

* Fix epoch milli conversions and move them into smithy-types
parent e5b7e22a
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -15,7 +15,8 @@ fuzz_target!(|message: Message| {
            Error::HeadersTooLong
            | Error::PayloadTooLong
            | Error::MessageTooLong
            | Error::InvalidHeaderNameLength,
            | Error::InvalidHeaderNameLength
            | Error::TimestampValueTooLarge(_),
        ) => {}
        Err(err) => panic!("unexpected error on write: {}", err),
        Ok(_) => {
+7 −0
Original line number Diff line number Diff line
@@ -3,6 +3,7 @@
 * SPDX-License-Identifier: Apache-2.0.
 */

use smithy_types::Instant;
use std::error::Error as StdError;
use std::fmt;

@@ -21,6 +22,7 @@ pub enum Error {
    MessageTooLong,
    PayloadTooLong,
    PreludeChecksumMismatch(u32, u32),
    TimestampValueTooLarge(Instant),
}

impl StdError for Error {}
@@ -49,6 +51,11 @@ impl fmt::Display for Error {
                "prelude checksum 0x{:X} didn't match expected checksum 0x{:X}",
                actual, expected
            ),
            TimestampValueTooLarge(time) => write!(
                f,
                "timestamp value {:?} is too large to fit into an i64",
                time
            ),
        }
    }
}
+18 −6
Original line number Diff line number Diff line
@@ -94,8 +94,9 @@ mod value {
                }
                TYPE_TIMESTAMP => {
                    if buffer.remaining() >= size_of::<i64>() {
                        Ok(HeaderValue::Timestamp(Instant::from_epoch_seconds(
                            buffer.get_i64(),
                        let epoch_millis = buffer.get_i64();
                        Ok(HeaderValue::Timestamp(Instant::from_epoch_millis(
                            epoch_millis,
                        )))
                    } else {
                        Err(Error::InvalidHeaderValue)
@@ -136,9 +137,12 @@ mod value {
                    buffer.put_u16(checked(val.as_bytes().len(), Error::HeaderValueTooLong)?);
                    buffer.put_slice(&val.as_bytes()[..]);
                }
                Timestamp(val) => {
                Timestamp(time) => {
                    buffer.put_u8(TYPE_TIMESTAMP);
                    buffer.put_i64(val.epoch_seconds());
                    buffer.put_i64(
                        time.to_epoch_millis()
                            .map_err(|_| Error::TimestampValueTooLarge(*time))?,
                    );
                }
                Uuid(val) => {
                    buffer.put_u8(TYPE_UUID);
@@ -236,6 +240,14 @@ impl Header {
    }
}

/// Writes the given `headers` to a `buffer`.
pub fn write_headers_to<B: BufMut>(headers: &[Header], mut buffer: B) -> Result<(), Error> {
    for header in headers {
        header.write_to(&mut buffer)?;
    }
    Ok(())
}

/// Event Stream message.
#[non_exhaustive]
#[derive(Clone, Debug, PartialEq)]
@@ -515,7 +527,7 @@ mod message_tests {
                Header::new("str", HeaderValue::String("some str".into())),
                Header::new(
                    "time",
                    HeaderValue::Timestamp(Instant::from_epoch_seconds(5_000_000_000))
                    HeaderValue::Timestamp(Instant::from_epoch_seconds(5_000_000))
                ),
                Header::new(
                    "uuid",
@@ -543,7 +555,7 @@ mod message_tests {
            .add_header(Header::new("str", HeaderValue::String("some str".into())))
            .add_header(Header::new(
                "time",
                HeaderValue::Timestamp(Instant::from_epoch_seconds(5_000_000_000)),
                HeaderValue::Timestamp(Instant::from_epoch_seconds(5_000_000)),
            ))
            .add_header(Header::new(
                "uuid",
+2 −1
Original line number Diff line number Diff line
@@ -11,8 +11,9 @@ default = ["chrono-conversions"]

[dependencies]
chrono = { version = "0.4", default-features = false, features = [] }
ryu = "1.0.5"
itoa = "0.4.0"
num-integer = "0.1"
ryu = "1.0.5"

[dev-dependencies]
base64 = "0.13.0"
+141 −4
Original line number Diff line number Diff line
@@ -5,11 +5,18 @@

use crate::instant::format::DateParseError;
use chrono::{DateTime, NaiveDateTime, Utc};
use num_integer::div_mod_floor;
use num_integer::Integer;
use std::error::Error as StdError;
use std::fmt;
use std::str::FromStr;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

mod format;

const MILLIS_PER_SECOND: i64 = 1000;
const NANOS_PER_MILLI: u32 = 1_000_000;

/* ANCHOR: instant */

#[derive(Debug, PartialEq, Clone, Copy)]
@@ -132,6 +139,36 @@ impl Instant {
        self.seconds
    }

    pub fn epoch_subsecond_nanos(&self) -> u32 {
        self.subsecond_nanos
    }

    /// Converts the `Instant` to the number of milliseconds since the Unix epoch.
    /// This is fallible since `Instant` holds more precision than an `i64`, and will
    /// return a `ConversionError` for `Instant` values that can't be converted.
    pub fn to_epoch_millis(&self) -> Result<i64, ConversionError> {
        let subsec_millis = i64::from(self.subsecond_nanos).div_floor(&(NANOS_PER_MILLI as i64));
        if self.seconds < 0 {
            self.seconds
                .checked_add(1)
                .and_then(|seconds| seconds.checked_mul(MILLIS_PER_SECOND))
                .and_then(|millis| millis.checked_sub(1000 - subsec_millis))
        } else {
            self.seconds
                .checked_mul(MILLIS_PER_SECOND)
                .and_then(|millis| millis.checked_add(subsec_millis))
        }
        .ok_or(ConversionError(
            "Instant value too large to fit into i64 epoch millis",
        ))
    }

    /// Converts number of milliseconds since the Unix epoch into an `Instant`.
    pub fn from_epoch_millis(epoch_millis: i64) -> Instant {
        let (seconds, millis) = div_mod_floor(epoch_millis, MILLIS_PER_SECOND);
        Instant::from_secs_and_nanos(seconds, millis as u32 * NANOS_PER_MILLI)
    }

    pub fn fmt(&self, format: Format) -> String {
        match format {
            Format::DateTime => format::rfc3339::format(&self),
@@ -148,6 +185,18 @@ impl Instant {
    }
}

#[derive(Debug)]
#[non_exhaustive]
pub struct ConversionError(&'static str);

impl StdError for ConversionError {}

impl fmt::Display for ConversionError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "{}", self.0)
    }
}

#[cfg(feature = "chrono-conversions")]
impl From<DateTime<Utc>> for Instant {
    fn from(value: DateTime<Utc>) -> Instant {
@@ -235,9 +284,97 @@ mod test {
    #[test]
    #[cfg(feature = "chrono-conversions")]
    fn chrono_conversions_round_trip() {
        let instant = Instant::from_secs_and_nanos(1234, 56789);
        for (seconds, nanos) in &[(1234, 56789), (-1234, 4321)] {
            let instant = Instant::from_secs_and_nanos(*seconds, *nanos);
            let chrono = instant.to_chrono();
            let instant_again: Instant = chrono.into();
            assert_eq!(instant, instant_again);
        }
    }

    #[derive(Debug)]
    struct EpochMillisTestCase {
        rfc3339: &'static str,
        epoch_millis: i64,
        epoch_seconds: i64,
        epoch_subsec_nanos: u32,
    }

    // These test case values were generated from the following Kotlin JVM code:
    // ```kotlin
    // val instant = Instant.ofEpochMilli(<epoch milli value>);
    // println(DateTimeFormatter.ISO_DATE_TIME.format(instant.atOffset(ZoneOffset.UTC)))
    // println(instant.epochSecond)
    // println(instant.nano)
    // ```
    const EPOCH_MILLIS_TEST_CASES: &[EpochMillisTestCase] = &[
        EpochMillisTestCase {
            rfc3339: "2021-07-30T21:20:04.123Z",
            epoch_millis: 1627680004123,
            epoch_seconds: 1627680004,
            epoch_subsec_nanos: 123000000,
        },
        EpochMillisTestCase {
            rfc3339: "1918-06-04T02:39:55.877Z",
            epoch_millis: -1627680004123,
            epoch_seconds: -1627680005,
            epoch_subsec_nanos: 877000000,
        },
        EpochMillisTestCase {
            rfc3339: "+292278994-08-17T07:12:55.807Z",
            epoch_millis: i64::MAX,
            epoch_seconds: 9223372036854775,
            epoch_subsec_nanos: 807000000,
        },
        EpochMillisTestCase {
            rfc3339: "-292275055-05-16T16:47:04.192Z",
            epoch_millis: i64::MIN,
            epoch_seconds: -9223372036854776,
            epoch_subsec_nanos: 192000000,
        },
    ];

    #[test]
    fn to_epoch_millis() {
        for test_case in EPOCH_MILLIS_TEST_CASES {
            println!("Test case: {:?}", test_case);
            let instant =
                Instant::from_secs_and_nanos(test_case.epoch_seconds, test_case.epoch_subsec_nanos);
            assert_eq!(test_case.epoch_seconds, instant.epoch_seconds());
            assert_eq!(
                test_case.epoch_subsec_nanos,
                instant.epoch_subsecond_nanos()
            );
            assert_eq!(test_case.epoch_millis, instant.to_epoch_millis().unwrap());
        }

        assert!(Instant::from_secs_and_nanos(i64::MAX, 0)
            .to_epoch_millis()
            .is_err());
    }

    #[test]
    fn from_epoch_millis() {
        for test_case in EPOCH_MILLIS_TEST_CASES {
            println!("Test case: {:?}", test_case);
            let instant = Instant::from_epoch_millis(test_case.epoch_millis);
            assert_eq!(test_case.epoch_seconds, instant.epoch_seconds());
            assert_eq!(
                test_case.epoch_subsec_nanos,
                instant.epoch_subsecond_nanos()
            );
        }
    }

    #[test]
    fn to_from_epoch_millis_round_trip() {
        for millis in &[0, 1627680004123, -1627680004123, i64::MAX, i64::MIN] {
            assert_eq!(
                *millis,
                Instant::from_epoch_millis(*millis)
                    .to_epoch_millis()
                    .unwrap()
            );
        }
    }
}