Unverified Commit 64144f40 authored by ysaito1001's avatar ysaito1001 Committed by GitHub
Browse files

Enable event stream operations with RPC bound protocols in client SDKs (#4036)

## Motivation and Context
Addresses aws-sdk-rust#213 and aws-sdk-rust#1188.

## Description
This PR consolidates the changes from the previous sub-PRs:
- https://github.com/smithy-lang/smithy-rs/pull/4004
- https://github.com/smithy-lang/smithy-rs/pull/4015
- https://github.com/smithy-lang/smithy-rs/pull/4023

Together, this PR enables event stream operations with RPC bound
protocols, including `SubscribeToShard` in Kinesis and `StartLiveTail`
in CloudWatchLogs in the Rust SDK.

## Testing
(consolidated bullets from the previous sub-PRs)
- Added a service integration test for cloudwatchlogs
- Added client SDK codegen tests in
`ClientEventStreamUnmarshallerGeneratorTest` to reverify
initial-response message handling
- Added client SDK codegen tests in
`ClientEventStreamMarshallerGeneratorTest` to verify initial-request
message handling
- Added `DisableStalledStreamProtectionTest`
- Confirmed a successful run in the release pipeline 

## Checklist
<!--- If a checkbox below is not applicable, then please DELETE it
rather than leaving it unchecked -->
- [x] For changes to the smithy-rs codegen or runtime crates, I have
created a changelog entry Markdown file in the `.changelog` directory,
specifying "client," "server," or both in the `applies_to` key.
- [x] For changes to the AWS SDK, generated SDK code, or SDK runtime
crates, I have created a changelog entry Markdown file in the
`.changelog` directory, specifying "aws-sdk-rust" in the `applies_to`
key.

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
parent 357415f3
Loading
Loading
Loading
Loading
+13 −0
Original line number Diff line number Diff line
---
applies_to:
- aws-sdk-rust
authors:
- ysaito1001
references:
- aws-sdk-rust#213
- aws-sdk-rust#1188
breaking: false
new_feature: true
bug_fix: false
---
Adds support for event stream operations with non-REST protocols such as AWS JSON. This update enables operations, including `SubscribeToShard` in Kinesis and `StartLiveTail` in CloudWatchLogs in the Rust SDK.
+12 −0
Original line number Diff line number Diff line
---
applies_to:
- client
authors:
- ysaito1001
references:
- smithy-rs#121
breaking: false
new_feature: true
bug_fix: false
---
Adds support for event stream operations with non-REST protocols such as RPC v2 CBOR.
+2 −4
Original line number Diff line number Diff line
@@ -9,6 +9,7 @@ import software.amazon.smithy.rust.codegen.client.smithy.customizations.DocsRsMe
import software.amazon.smithy.rust.codegen.client.smithy.customizations.DocsRsMetadataSettings
import software.amazon.smithy.rust.codegen.client.smithy.customize.ClientCodegenDecorator
import software.amazon.smithy.rust.codegen.client.smithy.customize.CombinedClientCodegenDecorator
import software.amazon.smithy.rustsdk.customize.AwsDisableStalledStreamProtection
import software.amazon.smithy.rustsdk.customize.DisabledAuthDecorator
import software.amazon.smithy.rustsdk.customize.IsTruncatedPaginatorDecorator
import software.amazon.smithy.rustsdk.customize.RemoveDefaultsDecorator
@@ -18,7 +19,6 @@ import software.amazon.smithy.rustsdk.customize.applyExceptFor
import software.amazon.smithy.rustsdk.customize.dsql.DsqlDecorator
import software.amazon.smithy.rustsdk.customize.ec2.Ec2Decorator
import software.amazon.smithy.rustsdk.customize.glacier.GlacierDecorator
import software.amazon.smithy.rustsdk.customize.lambda.LambdaDecorator
import software.amazon.smithy.rustsdk.customize.onlyApplyTo
import software.amazon.smithy.rustsdk.customize.rds.RdsDecorator
import software.amazon.smithy.rustsdk.customize.route53.Route53Decorator
@@ -30,7 +30,6 @@ import software.amazon.smithy.rustsdk.customize.s3control.S3ControlDecorator
import software.amazon.smithy.rustsdk.customize.sso.SSODecorator
import software.amazon.smithy.rustsdk.customize.sts.STSDecorator
import software.amazon.smithy.rustsdk.customize.timestream.TimestreamDecorator
import software.amazon.smithy.rustsdk.customize.transcribestreaming.TranscribeStreamingDecorator
import software.amazon.smithy.rustsdk.endpoints.AwsEndpointsStdLib
import software.amazon.smithy.rustsdk.endpoints.OperationInputTestDecorator
import software.amazon.smithy.rustsdk.endpoints.RequireEndpointRules
@@ -67,6 +66,7 @@ val DECORATORS: List<ClientCodegenDecorator> =
            ServiceEnvConfigDecorator(),
            HttpRequestCompressionDecorator(),
            DisablePayloadSigningDecorator(),
            AwsDisableStalledStreamProtection(),
            // TODO(https://github.com/smithy-lang/smithy-rs/issues/3863): Comment in once the issue has been resolved
            // SmokeTestsDecorator(),
        ),
@@ -80,7 +80,6 @@ val DECORATORS: List<ClientCodegenDecorator> =
        DsqlDecorator().onlyApplyTo("com.amazonaws.dsql#DSQL"),
        Ec2Decorator().onlyApplyTo("com.amazonaws.ec2#AmazonEC2"),
        GlacierDecorator().onlyApplyTo("com.amazonaws.glacier#Glacier"),
        LambdaDecorator().onlyApplyTo("com.amazonaws.lambda#AWSGirApiService"),
        RdsDecorator().onlyApplyTo("com.amazonaws.rds#AmazonRDSv19"),
        Route53Decorator().onlyApplyTo("com.amazonaws.route53#AWSDnsV20130401"),
        "com.amazonaws.s3#AmazonS3".applyDecorators(
@@ -95,7 +94,6 @@ val DECORATORS: List<ClientCodegenDecorator> =
        SSODecorator().onlyApplyTo("com.amazonaws.sso#SWBPortalService"),
        TimestreamDecorator().onlyApplyTo("com.amazonaws.timestreamwrite#Timestream_20181101"),
        TimestreamDecorator().onlyApplyTo("com.amazonaws.timestreamquery#Timestream_20181101"),
        TranscribeStreamingDecorator().onlyApplyTo("com.amazonaws.transcribestreaming#Transcribe"),
        // Only build docs-rs for linux to reduce load on docs.rs
        listOf(
            DocsRsMetadataDecorator(
+19 −9
Original line number Diff line number Diff line
@@ -34,9 +34,11 @@ import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Compani
import software.amazon.smithy.rust.codegen.core.rustlang.DependencyScope
import software.amazon.smithy.rust.codegen.core.rustlang.Writable
import software.amazon.smithy.rust.codegen.core.rustlang.writable
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeConfig
import software.amazon.smithy.rust.codegen.core.smithy.generators.LibRsCustomization
import software.amazon.smithy.rust.codegen.core.smithy.generators.LibRsSection
import software.amazon.smithy.rust.codegen.core.testutil.testDependenciesOnly
import software.amazon.smithy.rust.codegen.core.util.hasEventStreamOperations
import software.amazon.smithy.rustsdk.AwsCargoDependency.awsConfig
import software.amazon.smithy.rustsdk.AwsCargoDependency.awsRuntime
import java.nio.file.Files
@@ -76,12 +78,14 @@ class IntegrationTestDecorator : ClientCodegenDecorator {
}

class IntegrationTestDependencies(
    private val codegenContext: ClientCodegenContext,
    codegenContext: ClientCodegenContext,
    private val moduleName: String,
    private val hasTests: Boolean,
    private val hasBenches: Boolean,
) : LibRsCustomization() {
    private val runtimeConfig = codegenContext.runtimeConfig
    private val serviceShape = codegenContext.serviceShape
    private val model = codegenContext.model

    override fun section(section: LibRsSection) =
        when (section) {
@@ -89,16 +93,16 @@ class IntegrationTestDependencies(
                testDependenciesOnly {
                    if (hasTests) {
                        val smithyAsync =
                            CargoDependency.smithyAsync(codegenContext.runtimeConfig)
                            CargoDependency.smithyAsync(runtimeConfig)
                                .copy(features = setOf("test-util"), scope = DependencyScope.Dev)
                        val smithyTypes =
                            CargoDependency.smithyTypes(codegenContext.runtimeConfig)
                            CargoDependency.smithyTypes(runtimeConfig)
                                .copy(features = setOf("test-util"), scope = DependencyScope.Dev)
                        addDependency(awsRuntime(runtimeConfig).toDevDependency().withFeature("test-util"))
                        addDependency(FuturesUtil)
                        addDependency(FuturesUtil.toDevDependency())
                        addDependency(SerdeJson)
                        addDependency(smithyAsync)
                        addDependency(smithyProtocolTestHelpers(codegenContext.runtimeConfig))
                        addDependency(smithyProtocolTestHelpers(runtimeConfig))
                        addDependency(smithyRuntime(runtimeConfig).copy(features = setOf("test-util", "wire-mock"), scope = DependencyScope.Dev))
                        addDependency(smithyRuntimeApiTestUtil(runtimeConfig))
                        addDependency(smithyTypes)
@@ -109,6 +113,12 @@ class IntegrationTestDependencies(
                    if (hasBenches) {
                        addDependency(Criterion)
                    }
                    if (serviceShape.hasEventStreamOperations(model)) {
                        addDependency(
                            CargoDependency.smithyEventStream(runtimeConfig)
                                .copy(features = setOf("test-util"), scope = DependencyScope.Dev),
                        )
                    }
                    for (serviceSpecific in serviceSpecificCustomizations()) {
                        serviceSpecific.section(section)(this)
                    }
@@ -120,7 +130,7 @@ class IntegrationTestDependencies(
    private fun serviceSpecificCustomizations(): List<LibRsCustomization> =
        when (moduleName) {
            "transcribestreaming" -> listOf(TranscribeTestDependencies())
            "s3" -> listOf(S3TestDependencies(codegenContext))
            "s3" -> listOf(S3TestDependencies(runtimeConfig))
            "dynamodb" -> listOf(DynamoDbTestDependencies())
            else -> emptyList()
        }
@@ -142,11 +152,11 @@ class DynamoDbTestDependencies : LibRsCustomization() {
        }
}

class S3TestDependencies(private val codegenContext: ClientCodegenContext) : LibRsCustomization() {
class S3TestDependencies(private val runtimeConfig: RuntimeConfig) : LibRsCustomization() {
    override fun section(section: LibRsSection): Writable =
        writable {
            addDependency(awsConfig(codegenContext.runtimeConfig).toDevDependency().withFeature("behavior-version-latest"))
            addDependency(smithyExperimental(codegenContext.runtimeConfig).toDevDependency())
            addDependency(awsConfig(runtimeConfig).toDevDependency().withFeature("behavior-version-latest"))
            addDependency(smithyExperimental(runtimeConfig).toDevDependency())
            addDependency(AsyncStd)
            addDependency(BytesUtils.toDevDependency())
            addDependency(FastRand.toDevDependency())
+17 −8
Original line number Diff line number Diff line
@@ -3,7 +3,7 @@
 * SPDX-License-Identifier: Apache-2.0
 */

package software.amazon.smithy.rustsdk.customize.lambda
package software.amazon.smithy.rustsdk.customize

import software.amazon.smithy.model.Model
import software.amazon.smithy.model.shapes.OperationShape
@@ -12,22 +12,31 @@ import software.amazon.smithy.model.shapes.ShapeId
import software.amazon.smithy.model.transform.ModelTransformer
import software.amazon.smithy.rust.codegen.client.smithy.ClientRustSettings
import software.amazon.smithy.rust.codegen.client.smithy.customize.ClientCodegenDecorator
import software.amazon.smithy.rust.codegen.client.smithy.traits.IncompatibleWithStalledStreamProtectionTrait
import software.amazon.smithy.rust.codegen.client.smithy.transformers.DisableStalledStreamProtection
import software.amazon.smithy.rust.codegen.core.util.letIf
import java.util.logging.Logger

/**
 * Top level decorator for Lambda
 * Disables stalled stream protection for specific operations.
 *
 * While a generic client-level decorator, `DisableStalledStreamProtection`, exists to handle this
 * at the model level, certain cases require operation-specific removal criteria that cannot be
 * generalized. (If we can fully generate the criteria, this class can be removed.)
 *
 * This class serves as a centralized solution for disabling stalled stream protection in such cases,
 * preventing the need for service-specific decorators solely for this purpose.
 */
class LambdaDecorator : ClientCodegenDecorator {
class AwsDisableStalledStreamProtection : ClientCodegenDecorator {
    // These long-running operations may have times with no data transfer,
    // violating stalled stream protection.
    private val operationsIncompatibleWithStalledStreamProtection =
        setOf(
            ShapeId.from("com.amazonaws.lambda#Invoke"),
            ShapeId.from("com.amazonaws.lambda#InvokeAsync"),
            ShapeId.from("com.amazonaws.lambda#InvokeWithResponseStream"),
            ShapeId.from("com.amazonaws.s3#CopyObject"),
        )

    override val name: String = "Lambda"
    override val name: String = "AwsDisableStalledStreamProtection"
    override val order: Byte = 0
    private val logger = Logger.getLogger(javaClass.name)

@@ -37,9 +46,9 @@ class LambdaDecorator : ClientCodegenDecorator {
        settings: ClientRustSettings,
    ): Model =
        ModelTransformer.create().mapShapes(model) { shape ->
            shape.letIf(shape.id in operationsIncompatibleWithStalledStreamProtection) {
            shape.letIf(operationsIncompatibleWithStalledStreamProtection.contains(shape.id)) {
                logger.info("Adding IncompatibleWithStalledStreamProtection trait to $it")
                (it as OperationShape).toBuilder().addTrait(IncompatibleWithStalledStreamProtectionTrait()).build()
                (DisableStalledStreamProtection::transformOperation)((it as OperationShape))
            }
        }
}
Loading