diff --git a/crt/aws-crt-cpp b/crt/aws-crt-cpp index 25a2e61972f5..2705dbd13811 160000 --- a/crt/aws-crt-cpp +++ b/crt/aws-crt-cpp @@ -1 +1 @@ -Subproject commit 25a2e61972f5a0d517c6dc62533fdc7d7bd122a1 +Subproject commit 2705dbd13811d584dc03beb17567bd7fc1fa7069 diff --git a/prefetch_crt_dependency.sh b/prefetch_crt_dependency.sh index a559e2eb7b2b..d1e5fa9a1ecb 100755 --- a/prefetch_crt_dependency.sh +++ b/prefetch_crt_dependency.sh @@ -3,17 +3,17 @@ # SPDX-License-Identifier: Apache-2.0. CRT_URI_PREFIX=https://codeload.github.com/awslabs -CRT_URI=${CRT_URI_PREFIX}/aws-crt-cpp/zip/25a2e61972f5a0d517c6dc62533fdc7d7bd122a1 # v0.37.4 +CRT_URI=${CRT_URI_PREFIX}/aws-crt-cpp/zip/2705dbd13811d584dc03beb17567bd7fc1fa7069 # v0.38.4 AWS_C_AUTH_URI=${CRT_URI_PREFIX}/aws-c-auth/zip/fc4b87655e5cd3921f18d1859193c74af4102071 # v0.10.1 AWS_C_CAL_URI=${CRT_URI_PREFIX}/aws-c-cal/zip/1cb9412158890201a6ffceed779f90fe1f48180c # v0.9.13 AWS_C_COMMON_URI=${CRT_URI_PREFIX}/aws-c-common/zip/95515a8b1ff40d5bb14f965ca4cbbe99ad1843df # v0.12.6 AWS_C_COMPRESSION_URI=${CRT_URI_PREFIX}/aws-c-compression/zip/d8264e64f698341eb03039b96b4f44702a9b3f83 # v0.3.2 -AWS_C_EVENT_STREAM_URI=${CRT_URI_PREFIX}/aws-c-event-stream/zip/c741f95e9050a1a4bed4b3aa7543bd3e024f6e56 # v0.6.0 -AWS_C_HTTP_URI=${CRT_URI_PREFIX}/aws-c-http/zip/0d8e1a933f46b8af984dfc8168ebcdf32748c184 # v0.10.11 +AWS_C_EVENT_STREAM_URI=${CRT_URI_PREFIX}/aws-c-event-stream/zip/66cafb1d8bb1bfeb62a7601ce03d1a6fcd4798ed # v0.6.1 +AWS_C_HTTP_URI=${CRT_URI_PREFIX}/aws-c-http/zip/da535b1bf9c9334730eb78a26a1bbb3c069b38c9 # v0.10.14 AWS_C_IO_URI=${CRT_URI_PREFIX}/aws-c-io/zip/bfb0819d3906502483611ce832a5ec6b897c8421 # v0.26.1 AWS_C_MQTT_URI=${CRT_URI_PREFIX}/aws-c-mqtt/zip/41b6a7d6d566a56eff69743df66c077d56a80c9d # v0.14.0 -AWS_C_S3_URI=${CRT_URI_PREFIX}/aws-c-s3/zip/e9d1bde139f88b08aaa3bf0507f443f31ccede93 # v0.11.5 +AWS_C_S3_URI=${CRT_URI_PREFIX}/aws-c-s3/zip/ab764f57ec3cab7866c0f8b7d9e9772b7cc1b9ee # v0.12.2 AWS_C_SDKUTILS_URI=${CRT_URI_PREFIX}/aws-c-sdkutils/zip/f678bda9e21f7217e4bbf35e0d1ea59540687933 # v0.2.4 AWS_CHECKSUMS_URI=${CRT_URI_PREFIX}/aws-checksums/zip/1d5f2f1f3e5d013aae8810878ceb5b3f6f258c4e # v0.2.10 AWS_LC_URI=${CRT_URI_PREFIX}/aws-lc/zip/37d86461a95782fd5d8b77873f9e1cb134ea2f95 # v1.69.0 diff --git a/src/aws-cpp-sdk-core/include/aws/core/client/CoreErrors.h b/src/aws-cpp-sdk-core/include/aws/core/client/CoreErrors.h index 6507bbc5d56e..1f245145b6f8 100644 --- a/src/aws-cpp-sdk-core/include/aws/core/client/CoreErrors.h +++ b/src/aws-cpp-sdk-core/include/aws/core/client/CoreErrors.h @@ -47,6 +47,7 @@ namespace Aws REQUEST_TIMEOUT = 24, NOT_INITIALIZED = 25, MEMORY_ALLOCATION = 26, + NOT_IMPLEMENTED = 27, NETWORK_CONNECTION = 99, // General failure to send message to service diff --git a/src/aws-cpp-sdk-core/include/aws/core/http/HttpClient.h b/src/aws-cpp-sdk-core/include/aws/core/http/HttpClient.h index d38c77f4dc07..2116d0d579ed 100644 --- a/src/aws-cpp-sdk-core/include/aws/core/http/HttpClient.h +++ b/src/aws-cpp-sdk-core/include/aws/core/http/HttpClient.h @@ -6,11 +6,14 @@ #pragma once #include +#include +#include +#include -#include #include -#include #include +#include +#include namespace Aws { @@ -77,6 +80,16 @@ namespace Aws return !m_bad; } + using AcquireConnectionOutcome = Aws::Utils::Outcome, + Aws::Client::AWSError>; + virtual AcquireConnectionOutcome AcquireConnection(const std::shared_ptr& request) { + AWS_UNREFERENCED_PARAM(request); + return Aws::Client::AWSError{Aws::Client::CoreErrors::NOT_IMPLEMENTED, + "NotImplemented", + "creating a connection is not supported on this http client", + false}; + } + protected: bool m_bad; diff --git a/src/aws-cpp-sdk-core/include/aws/core/http/HttpClientStream.h b/src/aws-cpp-sdk-core/include/aws/core/http/HttpClientStream.h new file mode 100644 index 000000000000..05e37e71b35e --- /dev/null +++ b/src/aws-cpp-sdk-core/include/aws/core/http/HttpClientStream.h @@ -0,0 +1,20 @@ +/** +* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#pragma once +#include + +namespace Aws { +namespace Http { +class HttpResponse; +class ClientStream { +public: + virtual ~ClientStream() = default; + virtual bool Activate() = 0; + virtual int WriteData(std::shared_ptr stream, bool endStream = false) = 0; + virtual std::shared_ptr GetResponse() const = 0; +}; +} +} // namespace Aws diff --git a/src/aws-cpp-sdk-core/include/aws/core/http/HttpConnection.h b/src/aws-cpp-sdk-core/include/aws/core/http/HttpConnection.h new file mode 100644 index 000000000000..bdfedce1cc79 --- /dev/null +++ b/src/aws-cpp-sdk-core/include/aws/core/http/HttpConnection.h @@ -0,0 +1,22 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#pragma once +#include + +#include + +namespace Aws { +namespace Http { +class HttpRequest; +class Connection { + public: + virtual ~Connection() = default; + virtual std::shared_ptr NewClientStream( + const std::shared_ptr& request, + std::function onStreamComplete) = 0; +}; +} // namespace Http +} // namespace Aws diff --git a/src/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h b/src/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h index a0a87619042b..3a3760cefb99 100644 --- a/src/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h +++ b/src/aws-cpp-sdk-core/include/aws/core/http/crt/CRTHttpClient.h @@ -54,7 +54,9 @@ namespace Aws bool IsDefaultAwsHttpClient() const override { return true; } - private: + AcquireConnectionOutcome AcquireConnection(const std::shared_ptr& request) override; + + private: // Yeah, I know, but someone made MakeRequest() const and didn't think about the fact that // making an HTTP request most certainly mutates state. It was me. I'm the person that did that, and // now we're stuck with it. Thanks me. diff --git a/src/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp b/src/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp index dded366705f9..0731dc81bf7e 100644 --- a/src/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp +++ b/src/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp @@ -12,8 +12,264 @@ #include #include +#include + static const char *const CRT_HTTP_CLIENT_TAG = "CRTHttpClient"; +namespace { +// Just a wrapper around a Condition Variable and a mutex, which handles wait and timed waits while protecting +// from spurious wakeups. +class AsyncWaiter { + public: + AsyncWaiter() = default; + AsyncWaiter(const AsyncWaiter&) = delete; + AsyncWaiter& operator=(const AsyncWaiter&) = delete; + + void Wakeup() { + std::lock_guard locker(m_lock); + m_wakeupIntentional = true; + m_cvar.notify_one(); + } + + void WaitOnCompletion() { + std::unique_lock uniqueLocker(m_lock); + m_cvar.wait(uniqueLocker, [this]() { return m_wakeupIntentional; }); + } + + bool WaitOnCompletionFor(const size_t ms) { + std::unique_lock uniqueLocker(m_lock); + return m_cvar.wait_for(uniqueLocker, std::chrono::milliseconds(ms), [this]() { return m_wakeupIntentional; }); + } + + private: + std::mutex m_lock; + std::condition_variable m_cvar; + bool m_wakeupIntentional{false}; +}; + +void AddRequestMetadataToCrtRequest(const std::shared_ptr& request, + const std::shared_ptr& crtRequest) { + const char* methodStr = Aws::Http::HttpMethodMapper::GetNameForHttpMethod(request->GetMethod()); + AWS_LOGSTREAM_TRACE(CRT_HTTP_CLIENT_TAG, "Making " << methodStr << " request to " << request->GetURIString()); + AWS_LOGSTREAM_TRACE(CRT_HTTP_CLIENT_TAG, "Including headers:"); + // Add http headers to the request. + for (const auto& header : request->GetHeaders()) { + Aws::Crt::Http::HttpHeader crtHeader; + AWS_LOGSTREAM_TRACE(CRT_HTTP_CLIENT_TAG, header.first << ": " << header.second); + crtHeader.name = Aws::Crt::ByteCursorFromArray((const uint8_t*)header.first.data(), header.first.length()); + crtHeader.value = Aws::Crt::ByteCursorFromArray((const uint8_t*)header.second.data(), header.second.length()); + crtRequest->AddHeader(crtHeader); + } + + // HTTP method, GET, PUT, DELETE, etc... + auto methodCursor = Aws::Crt::ByteCursorFromCString(methodStr); + crtRequest->SetMethod(methodCursor); + + // Path portion of the request + auto pathStrCpy = request->GetUri().GetURLEncodedPathRFC3986(); + auto queryStrCpy = request->GetUri().GetQueryString(); + Aws::StringStream ss; + + // CRT client has you pass the query string as part of the path. concatenate that here. + ss << pathStrCpy << queryStrCpy; + auto fullPathAndQueryCpy = ss.str(); + auto pathCursor = Aws::Crt::ByteCursorFromArray((uint8_t*)fullPathAndQueryCpy.c_str(), fullPathAndQueryCpy.length()); + crtRequest->SetPath(pathCursor); +} + +void OnResponseBodyReceived(Aws::Crt::Http::HttpStream&, const Aws::Crt::ByteCursor& body, + const std::shared_ptr& response, + const std::shared_ptr& request) { + assert(response); + for (const auto& hashIterator : request->GetResponseValidationHashes()) { + std::stringstream headerStr; + headerStr << "x-amz-checksum-" << hashIterator.first; + if (response->HasHeader(headerStr.str().c_str())) { + hashIterator.second->Update(reinterpret_cast(body.ptr), body.len); + break; + } + } + + // When data is received from the content body of the incoming response, just copy it to the output stream. + response->GetResponseBody().write((const char*)body.ptr, static_cast(body.len)); + if (response->GetResponseBody().fail()) { + const auto& ref = response->GetResponseBody(); + AWS_LOGSTREAM_ERROR(CRT_HTTP_CLIENT_TAG, "Failed to write " << body.len << " (eof: " << ref.eof() << ", bad: " << ref.bad() << ")"); + } + + if (request->IsEventStreamRequest() && !response->HasHeader(Aws::Http::X_AMZN_ERROR_TYPE)) { + response->GetResponseBody().flush(); + } + + auto& receivedHandler = request->GetDataReceivedEventHandler(); + if (receivedHandler) { + receivedHandler(request.get(), response.get(), static_cast(body.len)); + } + + AWS_LOGSTREAM_TRACE(CRT_HTTP_CLIENT_TAG, body.len << " bytes written to response."); +} + +// on response headers arriving, write them to the response. +void OnIncomingHeaders(Aws::Crt::Http::HttpStream&, enum aws_http_header_block block, const Aws::Crt::Http::HttpHeader* headersArray, + std::size_t headersCount, const std::shared_ptr& response) { + if (block == AWS_HTTP_HEADER_BLOCK_INFORMATIONAL) return; + + AWS_LOGSTREAM_TRACE(CRT_HTTP_CLIENT_TAG, "Received Headers: "); + + for (size_t i = 0; i < headersCount; ++i) { + const Aws::Crt::Http::HttpHeader* header = &headersArray[i]; + Aws::String headerNameStr((const char*)header->name.ptr, header->name.len); + Aws::String headerValueStr((const char*)header->value.ptr, header->value.len); + AWS_LOGSTREAM_TRACE(CRT_HTTP_CLIENT_TAG, headerNameStr << ": " << headerValueStr); + response->AddHeader(headerNameStr, std::move(headerValueStr)); + } +} + +void OnIncomingHeadersBlockDone(Aws::Crt::Http::HttpStream& stream, enum aws_http_header_block, + const std::shared_ptr& response) { + AWS_LOGSTREAM_TRACE(CRT_HTTP_CLIENT_TAG, "Received response code: " << stream.GetResponseStatusCode()); + response->SetResponseCode((Aws::Http::HttpResponseCode)stream.GetResponseStatusCode()); +} + +// Request is done. If there was an error set it, otherwise just wake up the cvar. +void OnStreamComplete(Aws::Crt::Http::HttpStream&, int errorCode, AsyncWaiter& waiter, + const std::shared_ptr& response) { + if (errorCode) { + // TODO: get the right error parsed out. + response->SetClientErrorType(Aws::Client::CoreErrors::NETWORK_CONNECTION); + response->SetClientErrorMessage(aws_error_debug_str(errorCode)); + } + + waiter.Wakeup(); +} + +// if the connection acquisition failed, go ahead and fail the request and wakeup the cvar. +// If it succeeded go ahead and make the request. +void OnClientConnectionAvailable(std::shared_ptr connection, int errorCode, + std::shared_ptr& connectionReference, + Aws::Crt::Http::HttpRequestOptions& requestOptions, AsyncWaiter& waiter, + const std::shared_ptr& request, + const std::shared_ptr& response, const Aws::Http::HttpClient& client) { + bool shouldContinueRequest = client.ContinueRequest(*request); + + if (!shouldContinueRequest) { + response->SetClientErrorType(Aws::Client::CoreErrors::USER_CANCELLED); + response->SetClientErrorMessage("Request cancelled by user's continuation handler"); + waiter.Wakeup(); + return; + } + + int finalErrorCode = errorCode; + if (connection) { + AWS_LOGSTREAM_DEBUG(CRT_HTTP_CLIENT_TAG, "Obtained connection handle " << (void*)connection.get()); + + auto clientStream = connection->NewClientStream(requestOptions); + connectionReference = connection; + + if (clientStream && clientStream->Activate()) { + return; + } + + finalErrorCode = aws_last_error(); + AWS_LOGSTREAM_ERROR(CRT_HTTP_CLIENT_TAG, "Initiation of request failed because " << aws_error_debug_str(finalErrorCode)); + } + + const char* errorMsg = aws_error_debug_str(finalErrorCode); + AWS_LOGSTREAM_ERROR(CRT_HTTP_CLIENT_TAG, "Obtaining connection failed because " << errorMsg); + response->SetClientErrorType(Aws::Client::CoreErrors::NETWORK_CONNECTION); + response->SetClientErrorMessage(errorMsg); + + waiter.Wakeup(); +} + +class CRTClientStream : public Aws::Http::ClientStream { + public: + CRTClientStream(std::shared_ptr stream, std::shared_ptr response, + std::shared_ptr crtRequest) + : m_stream(std::move(stream)), m_response(std::move(response)), m_crtRequest(std::move(crtRequest)) {} + ~CRTClientStream() override = default; + + bool Activate() override { + return m_stream->Activate(); + }; + + int WriteData(std::shared_ptr stream, bool endStream) override { + auto crtStream = std::make_shared(stream, Aws::Crt::ApiAllocator()); + AsyncWaiter waiter; + auto writeDataResult = m_stream->WriteData(crtStream, + [&waiter](std::shared_ptr& stream, int errorCode) -> void { + AWS_UNREFERENCED_PARAM(stream); + AWS_UNREFERENCED_PARAM(errorCode); + waiter.Wakeup(); + }, + endStream); + waiter.WaitOnCompletion(); + return writeDataResult; + } + + std::shared_ptr GetResponse() const override { return m_response; } + + private: + std::shared_ptr m_stream; + std::shared_ptr m_response; + std::shared_ptr m_crtRequest; +}; + +class CRTConnection : public Aws::Http::Connection { + public: + explicit CRTConnection(std::shared_ptr m_connection) : m_connection(std::move(m_connection)) {} + ~CRTConnection() override = default; + + std::shared_ptr NewClientStream(const std::shared_ptr& request, + std::function onStreamComplete) override { + auto crtRequest = Aws::Crt::MakeShared(Aws::Crt::g_allocator); + auto response = Aws::MakeShared(CRT_HTTP_CLIENT_TAG, request); + + AddRequestMetadataToCrtRequest(request, crtRequest); + + Aws::Crt::Http::HttpRequestOptions requestOptions{}; + requestOptions.request = crtRequest.get(); + + requestOptions.onIncomingHeaders = [response](Aws::Crt::Http::HttpStream& stream, enum aws_http_header_block block, + const Aws::Crt::Http::HttpHeader* headersArray, std::size_t headersCount) { + OnIncomingHeaders(stream, block, headersArray, headersCount, response); + }; + + requestOptions.onIncomingHeadersBlockDone = [request, response](Aws::Crt::Http::HttpStream& stream, enum aws_http_header_block block) { + OnIncomingHeadersBlockDone(stream, block, response); + auto& headersHandler = request->GetHeadersReceivedEventHandler(); + if (headersHandler) { + headersHandler(request.get(), response.get()); + } + }; + + requestOptions.onIncomingBody = [request, response](Aws::Crt::Http::HttpStream& stream, const Aws::Crt::ByteCursor& body) { + OnResponseBodyReceived(stream, body, response, request); + }; + + requestOptions.onStreamComplete = [response, onStreamComplete](Aws::Crt::Http::HttpStream &, int errorCode) -> void { + if (errorCode) { + // TODO: get the right error parsed out. + response->SetClientErrorType(Aws::Client::CoreErrors::NETWORK_CONNECTION); + response->SetClientErrorMessage(aws_error_debug_str(errorCode)); + } + onStreamComplete(); + }; + + requestOptions.UseManualDataWrites = true; + + auto crtStream = m_connection->NewClientStream(requestOptions); + if (!crtStream) { + return nullptr; + } + return Aws::MakeShared(CRT_HTTP_CLIENT_TAG, std::move(crtStream), std::move(response), std::move(crtRequest)); + } + + private: + std::shared_ptr m_connection; +}; +} // namespace + // Adapts AWS SDK input streams and rate limiters to the CRT input stream reading model. class SDKAdaptingInputStream : public Aws::Crt::Io::StdIOStreamInputStream { public: @@ -98,40 +354,6 @@ class SDKAdaptingInputStream : public Aws::Crt::Io::StdIOStreamInputStream { bool m_isStreaming; }; -// Just a wrapper around a Condition Variable and a mutex, which handles wait and timed waits while protecting -// from spurious wakeups. -class AsyncWaiter -{ -public: - AsyncWaiter() = default; - AsyncWaiter(const AsyncWaiter&) = delete; - AsyncWaiter& operator=(const AsyncWaiter&) = delete; - - void Wakeup() - { - std::lock_guard locker(m_lock); - m_wakeupIntentional = true; - m_cvar.notify_one(); - } - - void WaitOnCompletion() - { - std::unique_lock uniqueLocker(m_lock); - m_cvar.wait(uniqueLocker, [this](){return m_wakeupIntentional;}); - } - - bool WaitOnCompletionFor(const size_t ms) - { - std::unique_lock uniqueLocker(m_lock); - return m_cvar.wait_for(uniqueLocker, std::chrono::milliseconds(ms), [this](){return m_wakeupIntentional;}); - } - -private: - std::mutex m_lock; - std::condition_variable m_cvar; - bool m_wakeupIntentional{false}; -}; - namespace Aws { namespace Http @@ -202,158 +424,6 @@ namespace Aws m_connectionPools.clear(); } - static void AddRequestMetadataToCrtRequest(const std::shared_ptr& request, const std::shared_ptr& crtRequest) - { - const char* methodStr = Aws::Http::HttpMethodMapper::GetNameForHttpMethod(request->GetMethod()); - AWS_LOGSTREAM_TRACE(CRT_HTTP_CLIENT_TAG, "Making " << methodStr << " request to " << request->GetURIString()); - AWS_LOGSTREAM_TRACE(CRT_HTTP_CLIENT_TAG, "Including headers:"); - //Add http headers to the request. - for (const auto& header : request->GetHeaders()) - { - Crt::Http::HttpHeader crtHeader; - AWS_LOGSTREAM_TRACE(CRT_HTTP_CLIENT_TAG, header.first << ": " << header.second); - crtHeader.name = Crt::ByteCursorFromArray((const uint8_t *)header.first.data(), header.first.length()); - crtHeader.value = Crt::ByteCursorFromArray((const uint8_t *)header.second.data(), header.second.length()); - crtRequest->AddHeader(crtHeader); - } - - // HTTP method, GET, PUT, DELETE, etc... - auto methodCursor = Crt::ByteCursorFromCString(methodStr); - crtRequest->SetMethod(methodCursor); - - // Path portion of the request - auto pathStrCpy = request->GetUri().GetURLEncodedPathRFC3986(); - auto queryStrCpy = request->GetUri().GetQueryString(); - Aws::StringStream ss; - - //CRT client has you pass the query string as part of the path. concatenate that here. - ss << pathStrCpy << queryStrCpy; - auto fullPathAndQueryCpy = ss.str(); - auto pathCursor = Crt::ByteCursorFromArray((uint8_t *)fullPathAndQueryCpy.c_str(), fullPathAndQueryCpy.length()); - crtRequest->SetPath(pathCursor); - } - - static void OnResponseBodyReceived(Crt::Http::HttpStream& stream, const Crt::ByteCursor& body, const std::shared_ptr& response, const std::shared_ptr& request, const Http::HttpClient& client) - { - if (!client.ContinueRequest(*request) || !client.IsRequestProcessingEnabled()) - { - AWS_LOGSTREAM_INFO(CRT_HTTP_CLIENT_TAG, "Request canceled. Canceling request by closing the connection."); - stream.GetConnection().Close(); - return; - } - - //TODO: handle the read rate limiter here, once back pressure is setup. - assert(response); - for (const auto& hashIterator : request->GetResponseValidationHashes()) - { - std::stringstream headerStr; - headerStr<<"x-amz-checksum-"<HasHeader(headerStr.str().c_str())) - { - hashIterator.second->Update(reinterpret_cast(body.ptr), body.len); - break; - } - } - - // When data is received from the content body of the incoming response, just copy it to the output stream. - response->GetResponseBody().write((const char*)body.ptr, static_cast(body.len)); - if (response->GetResponseBody().fail()) { - const auto& ref = response->GetResponseBody(); - AWS_LOGSTREAM_ERROR(CRT_HTTP_CLIENT_TAG, "Failed to write " << body.len << " (eof: " << ref.eof() << ", bad: " << ref.bad() << ")"); - } - - if (request->IsEventStreamRequest() && !response->HasHeader(Aws::Http::X_AMZN_ERROR_TYPE)) - { - response->GetResponseBody().flush(); - } - - auto& receivedHandler = request->GetDataReceivedEventHandler(); - if (receivedHandler) - { - receivedHandler(request.get(), response.get(), static_cast(body.len)); - } - - AWS_LOGSTREAM_TRACE(CRT_HTTP_CLIENT_TAG, body.len << " bytes written to response."); - - } - - // on response headers arriving, write them to the response. - static void OnIncomingHeaders(Crt::Http::HttpStream&, enum aws_http_header_block block, const Crt::Http::HttpHeader* headersArray, std::size_t headersCount, const std::shared_ptr& response) - { - if (block == AWS_HTTP_HEADER_BLOCK_INFORMATIONAL) return; - - AWS_LOGSTREAM_TRACE(CRT_HTTP_CLIENT_TAG, "Received Headers: "); - - for (size_t i = 0; i < headersCount; ++i) - { - const Crt::Http::HttpHeader* header = &headersArray[i]; - Aws::String headerNameStr((const char*)header->name.ptr, header->name.len); - Aws::String headerValueStr((const char*)header->value.ptr, header->value.len); - AWS_LOGSTREAM_TRACE(CRT_HTTP_CLIENT_TAG, headerNameStr << ": " << headerValueStr); - response->AddHeader(headerNameStr, std::move(headerValueStr)); - } - } - - static void OnIncomingHeadersBlockDone(Crt::Http::HttpStream& stream, enum aws_http_header_block, const std::shared_ptr& response) - { - AWS_LOGSTREAM_TRACE(CRT_HTTP_CLIENT_TAG, "Received response code: " << stream.GetResponseStatusCode()); - response->SetResponseCode((HttpResponseCode)stream.GetResponseStatusCode()); - } - - // Request is done. If there was an error set it, otherwise just wake up the cvar. - static void OnStreamComplete(Crt::Http::HttpStream&, int errorCode, AsyncWaiter& waiter, const std::shared_ptr& response) - { - if (errorCode) - { - //TODO: get the right error parsed out. - response->SetClientErrorType(Aws::Client::CoreErrors::NETWORK_CONNECTION); - response->SetClientErrorMessage(aws_error_debug_str(errorCode)); - } - - waiter.Wakeup(); - } - - // if the connection acquisition failed, go ahead and fail the request and wakeup the cvar. - // If it succeeded go ahead and make the request. - static void OnClientConnectionAvailable(std::shared_ptr connection, int errorCode, std::shared_ptr& connectionReference, - Crt::Http::HttpRequestOptions& requestOptions, AsyncWaiter& waiter, const std::shared_ptr& request, - const std::shared_ptr& response, const HttpClient& client) - { - bool shouldContinueRequest = client.ContinueRequest(*request); - - if (!shouldContinueRequest) - { - response->SetClientErrorType(Client::CoreErrors::USER_CANCELLED); - response->SetClientErrorMessage("Request cancelled by user's continuation handler"); - waiter.Wakeup(); - return; - } - - int finalErrorCode = errorCode; - if (connection) - { - AWS_LOGSTREAM_DEBUG(CRT_HTTP_CLIENT_TAG, "Obtained connection handle " << (void*)connection.get()); - - auto clientStream = connection->NewClientStream(requestOptions); - connectionReference = connection; - - if (clientStream && clientStream->Activate()) { - return; - } - - finalErrorCode = aws_last_error(); - AWS_LOGSTREAM_ERROR(CRT_HTTP_CLIENT_TAG, "Initiation of request failed because " << aws_error_debug_str(finalErrorCode)); - - } - - const char *errorMsg = aws_error_debug_str(finalErrorCode); - AWS_LOGSTREAM_ERROR(CRT_HTTP_CLIENT_TAG, "Obtaining connection failed because " << errorMsg); - response->SetClientErrorType(Aws::Client::CoreErrors::NETWORK_CONNECTION); - response->SetClientErrorMessage(errorMsg); - - waiter.Wakeup(); - } - std::shared_ptr CRTHttpClient::MakeRequest(const std::shared_ptr& request, Aws::Utils::RateLimits::RateLimiterInterface*, Aws::Utils::RateLimits::RateLimiterInterface*) const @@ -385,7 +455,13 @@ namespace Aws requestOptions.onIncomingBody = [this, request, response](Crt::Http::HttpStream& stream, const Crt::ByteCursor& body) { - OnResponseBodyReceived(stream, body, response, request, *this); + if (!ContinueRequest(*request) || !IsRequestProcessingEnabled()) + { + AWS_LOGSTREAM_INFO(CRT_HTTP_CLIENT_TAG, "Request canceled. Canceling request by closing the connection."); + stream.GetConnection().Close(); + return; + } + OnResponseBodyReceived(stream, body, response, request); }; requestOptions.onIncomingHeaders = @@ -618,5 +694,40 @@ namespace Aws } } + Aws::Http::HttpClient::AcquireConnectionOutcome CRTHttpClient::AcquireConnection(const std::shared_ptr& request) { + auto requestConnOptions = CreateConnectionOptionsForRequest(request); + auto connectionManager = GetWithCreateConnectionManagerForRequest(request, requestConnOptions); + + if (!connectionManager) + { + return AcquireConnectionOutcome{Aws::Client::AWSError{ + Aws::Client::CoreErrors::INVALID_PARAMETER_COMBINATION, + "InvalidParameterCombination", + aws_error_debug_str(aws_last_error()), + false + }}; + } + + AcquireConnectionOutcome outcome{}; + AsyncWaiter waiter; + + connectionManager->AcquireConnection( + [&outcome, &waiter]( + std::shared_ptr acquiredConnection, int errorCode) -> void { + if (errorCode != AWS_OP_SUCCESS) { + outcome = AcquireConnectionOutcome{Aws::Client::AWSError{ + Aws::Client::CoreErrors::NETWORK_CONNECTION, + "CouldNotAcquireConnection", + aws_error_debug_str(errorCode), + false}}; + } else { + outcome = AcquireConnectionOutcome{Aws::MakeShared(CRT_HTTP_CLIENT_TAG, std::move(acquiredConnection))}; + } + waiter.Wakeup(); + }); + + waiter.WaitOnCompletion(); + return outcome; + } } }