diff --git a/CMakeLists.txt b/CMakeLists.txt index 10817f68c..e92b1b247 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -183,6 +183,7 @@ set(DUCKDB_SRC_FILES src/duckdb/src/main/extension/extension_install.cpp src/duckdb/src/main/extension/extension_load.cpp src/duckdb/src/main/extension/extension_util.cpp + src/duckdb/ub_src_main_http.cpp src/duckdb/ub_src_main_relation.cpp src/duckdb/ub_src_main_secret.cpp src/duckdb/ub_src_main_settings.cpp diff --git a/src/duckdb/extension/core_functions/include/core_functions/scalar/string_functions.hpp b/src/duckdb/extension/core_functions/include/core_functions/scalar/string_functions.hpp index e37ce8af7..14fbb9a08 100644 --- a/src/duckdb/extension/core_functions/include/core_functions/scalar/string_functions.hpp +++ b/src/duckdb/extension/core_functions/include/core_functions/scalar/string_functions.hpp @@ -100,7 +100,7 @@ struct FormatFun { struct FormatBytesFun { static constexpr const char *Name = "format_bytes"; static constexpr const char *Parameters = "bytes"; - static constexpr const char *Description = "Converts bytes to a human-readable presentation (e.g. 16000 -> 15.6 KiB)"; + static constexpr const char *Description = "Converts bytes to a human-readable presentation (e.g., 16000 -> 15.6 KiB)"; static constexpr const char *Example = "format_bytes(1000 * 16)"; static constexpr const char *Categories = ""; @@ -116,7 +116,7 @@ struct FormatreadablesizeFun { struct FormatreadabledecimalsizeFun { static constexpr const char *Name = "formatReadableDecimalSize"; static constexpr const char *Parameters = "bytes"; - static constexpr const char *Description = "Converts bytes to a human-readable presentation (e.g. 16000 -> 16.0 KB)"; + static constexpr const char *Description = "Converts bytes to a human-readable presentation (e.g., 16000 -> 16.0 KB)"; static constexpr const char *Example = "format_bytes(1000 * 16)"; static constexpr const char *Categories = ""; diff --git a/src/duckdb/extension/json/json_functions/read_json.cpp b/src/duckdb/extension/json/json_functions/read_json.cpp index b0da448a8..ed41d3a81 100644 --- a/src/duckdb/extension/json/json_functions/read_json.cpp +++ b/src/duckdb/extension/json/json_functions/read_json.cpp @@ -132,6 +132,10 @@ class JSONSchemaTask : public BaseExecutorTask { } } + string TaskType() const override { + return "JSONSchemaTask"; + } + private: AutoDetectState &auto_detect_state; JSONStructureNode &node; diff --git a/src/duckdb/src/catalog/catalog_entry/duck_schema_entry.cpp b/src/duckdb/src/catalog/catalog_entry/duck_schema_entry.cpp index dd8a91305..a0f40ce82 100644 --- a/src/duckdb/src/catalog/catalog_entry/duck_schema_entry.cpp +++ b/src/duckdb/src/catalog/catalog_entry/duck_schema_entry.cpp @@ -68,11 +68,14 @@ static void FindForeignKeyInformation(TableCatalogEntry &table, AlterForeignKeyT } DuckSchemaEntry::DuckSchemaEntry(Catalog &catalog, CreateSchemaInfo &info) - : SchemaCatalogEntry(catalog, info), tables(catalog, make_uniq(catalog, *this)), - indexes(catalog), table_functions(catalog, make_uniq(catalog, *this)), + : SchemaCatalogEntry(catalog, info), + tables(catalog, catalog.IsSystemCatalog() ? make_uniq(catalog, *this) : nullptr), + indexes(catalog), + table_functions(catalog, + catalog.IsSystemCatalog() ? make_uniq(catalog, *this) : nullptr), copy_functions(catalog), pragma_functions(catalog), - functions(catalog, make_uniq(catalog, *this)), sequences(catalog), collations(catalog), - types(catalog, make_uniq(catalog, *this)) { + functions(catalog, catalog.IsSystemCatalog() ? make_uniq(catalog, *this) : nullptr), + sequences(catalog), collations(catalog), types(catalog, make_uniq(catalog, *this)) { } unique_ptr DuckSchemaEntry::Copy(ClientContext &context) const { diff --git a/src/duckdb/src/common/enum_util.cpp b/src/duckdb/src/common/enum_util.cpp index 72a7b42bf..c51440d53 100644 --- a/src/duckdb/src/common/enum_util.cpp +++ b/src/duckdb/src/common/enum_util.cpp @@ -68,6 +68,7 @@ #include "duckdb/common/file_buffer.hpp" #include "duckdb/common/file_open_flags.hpp" #include "duckdb/common/filename_pattern.hpp" +#include "duckdb/common/http_util.hpp" #include "duckdb/common/multi_file/multi_file_data.hpp" #include "duckdb/common/multi_file/multi_file_list.hpp" #include "duckdb/common/multi_file/multi_file_options.hpp" @@ -1976,6 +1977,86 @@ HLLStorageType EnumUtil::FromString(const char *value) { return static_cast(StringUtil::StringToEnum(GetHLLStorageTypeValues(), 2, "HLLStorageType", value)); } +const StringUtil::EnumStringLiteral *GetHTTPStatusCodeValues() { + static constexpr StringUtil::EnumStringLiteral values[] { + { static_cast(HTTPStatusCode::INVALID), "INVALID" }, + { static_cast(HTTPStatusCode::Continue_100), "Continue_100" }, + { static_cast(HTTPStatusCode::SwitchingProtocol_101), "SwitchingProtocol_101" }, + { static_cast(HTTPStatusCode::Processing_102), "Processing_102" }, + { static_cast(HTTPStatusCode::EarlyHints_103), "EarlyHints_103" }, + { static_cast(HTTPStatusCode::OK_200), "OK_200" }, + { static_cast(HTTPStatusCode::Created_201), "Created_201" }, + { static_cast(HTTPStatusCode::Accepted_202), "Accepted_202" }, + { static_cast(HTTPStatusCode::NonAuthoritativeInformation_203), "NonAuthoritativeInformation_203" }, + { static_cast(HTTPStatusCode::NoContent_204), "NoContent_204" }, + { static_cast(HTTPStatusCode::ResetContent_205), "ResetContent_205" }, + { static_cast(HTTPStatusCode::PartialContent_206), "PartialContent_206" }, + { static_cast(HTTPStatusCode::MultiStatus_207), "MultiStatus_207" }, + { static_cast(HTTPStatusCode::AlreadyReported_208), "AlreadyReported_208" }, + { static_cast(HTTPStatusCode::IMUsed_226), "IMUsed_226" }, + { static_cast(HTTPStatusCode::MultipleChoices_300), "MultipleChoices_300" }, + { static_cast(HTTPStatusCode::MovedPermanently_301), "MovedPermanently_301" }, + { static_cast(HTTPStatusCode::Found_302), "Found_302" }, + { static_cast(HTTPStatusCode::SeeOther_303), "SeeOther_303" }, + { static_cast(HTTPStatusCode::NotModified_304), "NotModified_304" }, + { static_cast(HTTPStatusCode::UseProxy_305), "UseProxy_305" }, + { static_cast(HTTPStatusCode::unused_306), "unused_306" }, + { static_cast(HTTPStatusCode::TemporaryRedirect_307), "TemporaryRedirect_307" }, + { static_cast(HTTPStatusCode::PermanentRedirect_308), "PermanentRedirect_308" }, + { static_cast(HTTPStatusCode::BadRequest_400), "BadRequest_400" }, + { static_cast(HTTPStatusCode::Unauthorized_401), "Unauthorized_401" }, + { static_cast(HTTPStatusCode::PaymentRequired_402), "PaymentRequired_402" }, + { static_cast(HTTPStatusCode::Forbidden_403), "Forbidden_403" }, + { static_cast(HTTPStatusCode::NotFound_404), "NotFound_404" }, + { static_cast(HTTPStatusCode::MethodNotAllowed_405), "MethodNotAllowed_405" }, + { static_cast(HTTPStatusCode::NotAcceptable_406), "NotAcceptable_406" }, + { static_cast(HTTPStatusCode::ProxyAuthenticationRequired_407), "ProxyAuthenticationRequired_407" }, + { static_cast(HTTPStatusCode::RequestTimeout_408), "RequestTimeout_408" }, + { static_cast(HTTPStatusCode::Conflict_409), "Conflict_409" }, + { static_cast(HTTPStatusCode::Gone_410), "Gone_410" }, + { static_cast(HTTPStatusCode::LengthRequired_411), "LengthRequired_411" }, + { static_cast(HTTPStatusCode::PreconditionFailed_412), "PreconditionFailed_412" }, + { static_cast(HTTPStatusCode::PayloadTooLarge_413), "PayloadTooLarge_413" }, + { static_cast(HTTPStatusCode::UriTooLong_414), "UriTooLong_414" }, + { static_cast(HTTPStatusCode::UnsupportedMediaType_415), "UnsupportedMediaType_415" }, + { static_cast(HTTPStatusCode::RangeNotSatisfiable_416), "RangeNotSatisfiable_416" }, + { static_cast(HTTPStatusCode::ExpectationFailed_417), "ExpectationFailed_417" }, + { static_cast(HTTPStatusCode::ImATeapot_418), "ImATeapot_418" }, + { static_cast(HTTPStatusCode::MisdirectedRequest_421), "MisdirectedRequest_421" }, + { static_cast(HTTPStatusCode::UnprocessableContent_422), "UnprocessableContent_422" }, + { static_cast(HTTPStatusCode::Locked_423), "Locked_423" }, + { static_cast(HTTPStatusCode::FailedDependency_424), "FailedDependency_424" }, + { static_cast(HTTPStatusCode::TooEarly_425), "TooEarly_425" }, + { static_cast(HTTPStatusCode::UpgradeRequired_426), "UpgradeRequired_426" }, + { static_cast(HTTPStatusCode::PreconditionRequired_428), "PreconditionRequired_428" }, + { static_cast(HTTPStatusCode::TooManyRequests_429), "TooManyRequests_429" }, + { static_cast(HTTPStatusCode::RequestHeaderFieldsTooLarge_431), "RequestHeaderFieldsTooLarge_431" }, + { static_cast(HTTPStatusCode::UnavailableForLegalReasons_451), "UnavailableForLegalReasons_451" }, + { static_cast(HTTPStatusCode::InternalServerError_500), "InternalServerError_500" }, + { static_cast(HTTPStatusCode::NotImplemented_501), "NotImplemented_501" }, + { static_cast(HTTPStatusCode::BadGateway_502), "BadGateway_502" }, + { static_cast(HTTPStatusCode::ServiceUnavailable_503), "ServiceUnavailable_503" }, + { static_cast(HTTPStatusCode::GatewayTimeout_504), "GatewayTimeout_504" }, + { static_cast(HTTPStatusCode::HttpVersionNotSupported_505), "HttpVersionNotSupported_505" }, + { static_cast(HTTPStatusCode::VariantAlsoNegotiates_506), "VariantAlsoNegotiates_506" }, + { static_cast(HTTPStatusCode::InsufficientStorage_507), "InsufficientStorage_507" }, + { static_cast(HTTPStatusCode::LoopDetected_508), "LoopDetected_508" }, + { static_cast(HTTPStatusCode::NotExtended_510), "NotExtended_510" }, + { static_cast(HTTPStatusCode::NetworkAuthenticationRequired_511), "NetworkAuthenticationRequired_511" } + }; + return values; +} + +template<> +const char* EnumUtil::ToChars(HTTPStatusCode value) { + return StringUtil::EnumToString(GetHTTPStatusCodeValues(), 64, "HTTPStatusCode", static_cast(value)); +} + +template<> +HTTPStatusCode EnumUtil::FromString(const char *value) { + return static_cast(StringUtil::StringToEnum(GetHTTPStatusCodeValues(), 64, "HTTPStatusCode", value)); +} + const StringUtil::EnumStringLiteral *GetIndexAppendModeValues() { static constexpr StringUtil::EnumStringLiteral values[] { { static_cast(IndexAppendMode::DEFAULT), "DEFAULT" }, diff --git a/src/duckdb/src/common/http_util.cpp b/src/duckdb/src/common/http_util.cpp deleted file mode 100644 index 71248367e..000000000 --- a/src/duckdb/src/common/http_util.cpp +++ /dev/null @@ -1,29 +0,0 @@ -#include "duckdb/common/http_util.hpp" - -#include "duckdb/common/operator/cast_operators.hpp" -#include "duckdb/common/string_util.hpp" - -namespace duckdb { - -void HTTPUtil::ParseHTTPProxyHost(string &proxy_value, string &hostname_out, idx_t &port_out, idx_t default_port) { - auto sanitized_proxy_value = proxy_value; - if (StringUtil::StartsWith(proxy_value, "http://")) { - sanitized_proxy_value = proxy_value.substr(7); - } - auto proxy_split = StringUtil::Split(sanitized_proxy_value, ":"); - if (proxy_split.size() == 1) { - hostname_out = proxy_split[0]; - port_out = default_port; - } else if (proxy_split.size() == 2) { - idx_t port; - if (!TryCast::Operation(proxy_split[1], port, false)) { - throw InvalidInputException("Failed to parse port from http_proxy '%s'", proxy_value); - } - hostname_out = proxy_split[0]; - port_out = port; - } else { - throw InvalidInputException("Failed to parse http_proxy '%s' into a host and port", proxy_value); - } -} - -} // namespace duckdb diff --git a/src/duckdb/src/common/sort/partition_state.cpp b/src/duckdb/src/common/sort/partition_state.cpp index 6ae2f1c5a..73e462d54 100644 --- a/src/duckdb/src/common/sort/partition_state.cpp +++ b/src/duckdb/src/common/sort/partition_state.cpp @@ -581,6 +581,10 @@ class PartitionMergeTask : public ExecutorTask { TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override; + string TaskType() const override { + return "PartitionMergeTask"; + } + private: struct ExecutorCallback : public PartitionGlobalMergeStates::Callback { explicit ExecutorCallback(Executor &executor) : executor(executor) { diff --git a/src/duckdb/src/execution/operator/aggregate/physical_hash_aggregate.cpp b/src/duckdb/src/execution/operator/aggregate/physical_hash_aggregate.cpp index 6636389ea..3d5c1632c 100644 --- a/src/duckdb/src/execution/operator/aggregate/physical_hash_aggregate.cpp +++ b/src/duckdb/src/execution/operator/aggregate/physical_hash_aggregate.cpp @@ -488,6 +488,10 @@ class HashAggregateFinalizeTask : public ExecutorTask { public: TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override; + string TaskType() const override { + return "HashAggregateFinalizeTask"; + } + private: ClientContext &context; Pipeline &pipeline; @@ -547,6 +551,10 @@ class HashAggregateDistinctFinalizeTask : public ExecutorTask { public: TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override; + string TaskType() const override { + return "HashAggregateDistinctFinalizeTask"; + } + private: TaskExecutionResult AggregateDistinctGrouping(const idx_t grouping_idx); diff --git a/src/duckdb/src/execution/operator/aggregate/physical_ungrouped_aggregate.cpp b/src/duckdb/src/execution/operator/aggregate/physical_ungrouped_aggregate.cpp index 693daee0b..318eca729 100644 --- a/src/duckdb/src/execution/operator/aggregate/physical_ungrouped_aggregate.cpp +++ b/src/duckdb/src/execution/operator/aggregate/physical_ungrouped_aggregate.cpp @@ -445,6 +445,10 @@ class UngroupedDistinctAggregateFinalizeTask : public ExecutorTask { TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override; + string TaskType() const override { + return "UngroupedDistinctAggregateFinalizeTask"; + } + private: TaskExecutionResult AggregateDistinct(); diff --git a/src/duckdb/src/execution/operator/csv_scanner/util/csv_error.cpp b/src/duckdb/src/execution/operator/csv_scanner/util/csv_error.cpp index 4c419068c..c1bdda64c 100644 --- a/src/duckdb/src/execution/operator/csv_scanner/util/csv_error.cpp +++ b/src/duckdb/src/execution/operator/csv_scanner/util/csv_error.cpp @@ -330,10 +330,10 @@ CSVError CSVError::CastError(const CSVReaderOptions &options, const string &colu if (!options.WasTypeManuallySet(column_idx)) { how_to_fix_it << "This type was auto-detected from the CSV file." << '\n'; how_to_fix_it << "Possible solutions:" << '\n'; - how_to_fix_it << "* Override the type for this column manually by setting the type explicitly, e.g. types={'" + how_to_fix_it << "* Override the type for this column manually by setting the type explicitly, e.g., types={'" << column_name << "': 'VARCHAR'}" << '\n'; how_to_fix_it - << "* Set the sample size to a larger value to enable the auto-detection to scan more values, e.g. " + << "* Set the sample size to a larger value to enable the auto-detection to scan more values, e.g., " "sample_size=-1" << '\n'; how_to_fix_it << "* Use a COPY statement to automatically derive types from an existing table." << '\n'; diff --git a/src/duckdb/src/execution/operator/join/physical_hash_join.cpp b/src/duckdb/src/execution/operator/join/physical_hash_join.cpp index ede2e371a..2650056c8 100644 --- a/src/duckdb/src/execution/operator/join/physical_hash_join.cpp +++ b/src/duckdb/src/execution/operator/join/physical_hash_join.cpp @@ -469,6 +469,10 @@ class HashJoinTableInitTask : public ExecutorTask { return TaskExecutionResult::TASK_FINISHED; } + string TaskType() const override { + return "HashJoinTableInitTask"; + } + private: HashJoinGlobalSinkState &sink; idx_t entry_idx_from; @@ -524,6 +528,9 @@ class HashJoinFinalizeTask : public ExecutorTask { event->FinishTask(); return TaskExecutionResult::TASK_FINISHED; } + string TaskType() const override { + return "HashJoinFinalizeTask"; + } private: HashJoinGlobalSinkState &sink; @@ -607,6 +614,10 @@ class HashJoinRepartitionTask : public ExecutorTask { return TaskExecutionResult::TASK_FINISHED; } + string TaskType() const override { + return "HashJoinRepartitionTask"; + } + private: JoinHashTable &global_ht; JoinHashTable &local_ht; diff --git a/src/duckdb/src/execution/operator/join/physical_iejoin.cpp b/src/duckdb/src/execution/operator/join/physical_iejoin.cpp index 20a517c15..71c0e285c 100644 --- a/src/duckdb/src/execution/operator/join/physical_iejoin.cpp +++ b/src/duckdb/src/execution/operator/join/physical_iejoin.cpp @@ -1,3 +1,5 @@ +#include + #include "duckdb/execution/operator/join/physical_iejoin.hpp" #include "duckdb/common/atomic.hpp" @@ -18,10 +20,13 @@ namespace duckdb { PhysicalIEJoin::PhysicalIEJoin(LogicalComparisonJoin &op, PhysicalOperator &left, PhysicalOperator &right, - vector cond, JoinType join_type, idx_t estimated_cardinality) + vector cond, JoinType join_type, idx_t estimated_cardinality, + unique_ptr pushdown_info) : PhysicalRangeJoin(op, PhysicalOperatorType::IE_JOIN, left, right, std::move(cond), join_type, estimated_cardinality) { + filter_pushdown = std::move(pushdown_info); + // 1. let L1 (resp. L2) be the array of column X (resp. Y) D_ASSERT(conditions.size() >= 2); for (idx_t i = 0; i < 2; ++i) { @@ -61,27 +66,22 @@ PhysicalIEJoin::PhysicalIEJoin(LogicalComparisonJoin &op, PhysicalOperator &left } } +PhysicalIEJoin::PhysicalIEJoin(LogicalComparisonJoin &op, PhysicalOperator &left, PhysicalOperator &right, + vector cond, JoinType join_type, idx_t estimated_cardinality) + : PhysicalIEJoin(op, left, right, std::move(cond), join_type, estimated_cardinality, nullptr) { +} + //===--------------------------------------------------------------------===// // Sink //===--------------------------------------------------------------------===// -class IEJoinLocalState : public LocalSinkState { -public: - using LocalSortedTable = PhysicalRangeJoin::LocalSortedTable; - - IEJoinLocalState(ClientContext &context, const PhysicalRangeJoin &op, const idx_t child) - : table(context, op, child) { - } - - //! The local sort state - LocalSortedTable table; -}; +class IEJoinLocalState; class IEJoinGlobalState : public GlobalSinkState { public: using GlobalSortedTable = PhysicalRangeJoin::GlobalSortedTable; public: - IEJoinGlobalState(ClientContext &context, const PhysicalIEJoin &op) : child(0) { + IEJoinGlobalState(ClientContext &context, const PhysicalIEJoin &op) : child(1) { tables.resize(2); RowLayout lhs_layout; lhs_layout.Initialize(op.children[0].get().GetTypes()); @@ -94,28 +94,48 @@ class IEJoinGlobalState : public GlobalSinkState { vector rhs_order; rhs_order.emplace_back(op.rhs_orders[0].Copy()); tables[1] = make_uniq(context, rhs_order, rhs_layout, op); - } - IEJoinGlobalState(IEJoinGlobalState &prev) : tables(std::move(prev.tables)), child(prev.child + 1) { - state = prev.state; + if (op.filter_pushdown) { + skip_filter_pushdown = op.filter_pushdown->probe_info.empty(); + global_filter_state = op.filter_pushdown->GetGlobalState(context, op); + } } - void Sink(DataChunk &input, IEJoinLocalState &lstate) { - auto &table = *tables[child]; - auto &global_sort_state = table.global_sort_state; - auto &local_sort_state = lstate.table.local_sort_state; + void Sink(DataChunk &input, IEJoinLocalState &lstate); + void Finalize(Pipeline &pipeline, Event &event) { + // Sort the current input child + D_ASSERT(child < tables.size()); + tables[child]->Finalize(pipeline, event); + child = child ? 0 : 2; + skip_filter_pushdown = true; + }; - // Sink the data into the local sort state - lstate.table.Sink(input, global_sort_state); + //! The two input tables (IEJoin materialises both sides) + vector> tables; + //! The child that is being materialised (right/1 then left/0) + size_t child; + //! Should we not bother pushing down filters? + bool skip_filter_pushdown = false; + //! The global filter states to push down (if any) + unique_ptr global_filter_state; +}; - // When sorting data reaches a certain size, we sort it - if (local_sort_state.SizeInBytes() >= table.memory_per_thread) { - local_sort_state.Sort(global_sort_state, true); +class IEJoinLocalState : public LocalSinkState { +public: + using LocalSortedTable = PhysicalRangeJoin::LocalSortedTable; + + IEJoinLocalState(ClientContext &context, const PhysicalRangeJoin &op, IEJoinGlobalState &gstate) + : table(context, op, gstate.child) { + + if (op.filter_pushdown) { + local_filter_state = op.filter_pushdown->GetLocalState(*gstate.global_filter_state); } } - vector> tables; - size_t child; + //! The local sort state + LocalSortedTable table; + //! Local state for accumulating filter statistics + unique_ptr local_filter_state; }; unique_ptr PhysicalIEJoin::GetGlobalSinkState(ClientContext &context) const { @@ -124,12 +144,22 @@ unique_ptr PhysicalIEJoin::GetGlobalSinkState(ClientContext &co } unique_ptr PhysicalIEJoin::GetLocalSinkState(ExecutionContext &context) const { - idx_t sink_child = 0; - if (sink_state) { - const auto &ie_sink = sink_state->Cast(); - sink_child = ie_sink.child; + auto &ie_sink = sink_state->Cast(); + return make_uniq(context.client, *this, ie_sink); +} + +void IEJoinGlobalState::Sink(DataChunk &input, IEJoinLocalState &lstate) { + auto &table = *tables[child]; + auto &global_sort_state = table.global_sort_state; + auto &local_sort_state = lstate.table.local_sort_state; + + // Sink the data into the local sort state + lstate.table.Sink(input, global_sort_state); + + // When sorting data reaches a certain size, we sort it + if (local_sort_state.SizeInBytes() >= table.memory_per_thread) { + local_sort_state.Sort(global_sort_state, true); } - return make_uniq(context.client, *this, sink_child); } SinkResultType PhysicalIEJoin::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const { @@ -138,6 +168,10 @@ SinkResultType PhysicalIEJoin::Sink(ExecutionContext &context, DataChunk &chunk, gstate.Sink(chunk, lstate); + if (filter_pushdown && !gstate.skip_filter_pushdown) { + filter_pushdown->Sink(lstate.table.keys, *lstate.local_filter_state); + } + return SinkResultType::NEED_MORE_INPUT; } @@ -150,6 +184,10 @@ SinkCombineResultType PhysicalIEJoin::Combine(ExecutionContext &context, Operato context.thread.profiler.Flush(*this); client_profiler.Flush(context.thread.profiler); + if (filter_pushdown && !gstate.skip_filter_pushdown) { + filter_pushdown->Combine(*gstate.global_filter_state, *lstate.local_filter_state); + } + return SinkCombineResultType::FINISHED; } @@ -159,6 +197,9 @@ SinkCombineResultType PhysicalIEJoin::Combine(ExecutionContext &context, Operato SinkFinalizeType PhysicalIEJoin::Finalize(Pipeline &pipeline, Event &event, ClientContext &context, OperatorSinkFinalizeInput &input) const { auto &gstate = input.global_state.Cast(); + if (filter_pushdown && !gstate.skip_filter_pushdown) { + (void)filter_pushdown->Finalize(context, nullptr, *gstate.global_filter_state, *this); + } auto &table = *gstate.tables[gstate.child]; auto &global_sort_state = table.global_sort_state; @@ -171,11 +212,8 @@ SinkFinalizeType PhysicalIEJoin::Finalize(Pipeline &pipeline, Event &event, Clie return SinkFinalizeType::NO_OUTPUT_POSSIBLE; } - // Sort the current input child - table.Finalize(pipeline, event); - // Move to the next input child - ++gstate.child; + gstate.Finalize(pipeline, event); return SinkFinalizeType::READY; } @@ -1041,16 +1079,16 @@ void PhysicalIEJoin::BuildPipelines(Pipeline ¤t, MetaPipeline &meta_pipeli // Create one child meta pipeline that will hold the LHS and RHS pipelines auto &child_meta_pipeline = meta_pipeline.CreateChildMetaPipeline(current, *this); - // Build out LHS - auto lhs_pipeline = child_meta_pipeline.GetBasePipeline(); - children[0].get().BuildPipelines(*lhs_pipeline, child_meta_pipeline); + // Build out RHS first because that is the order the join planner expects. + auto rhs_pipeline = child_meta_pipeline.GetBasePipeline(); + children[1].get().BuildPipelines(*rhs_pipeline, child_meta_pipeline); - // Build out RHS - auto &rhs_pipeline = child_meta_pipeline.CreatePipeline(); - children[1].get().BuildPipelines(rhs_pipeline, child_meta_pipeline); + // Build out LHS + auto &lhs_pipeline = child_meta_pipeline.CreatePipeline(); + children[0].get().BuildPipelines(lhs_pipeline, child_meta_pipeline); - // Despite having the same sink, RHS and everything created after it need their own (same) PipelineFinishEvent - child_meta_pipeline.AddFinishEvent(rhs_pipeline); + // Despite having the same sink, LHS and everything created after it need their own (same) PipelineFinishEvent + child_meta_pipeline.AddFinishEvent(lhs_pipeline); } } // namespace duckdb diff --git a/src/duckdb/src/execution/operator/join/physical_range_join.cpp b/src/duckdb/src/execution/operator/join/physical_range_join.cpp index bddbd86b9..1d98918c2 100644 --- a/src/duckdb/src/execution/operator/join/physical_range_join.cpp +++ b/src/duckdb/src/execution/operator/join/physical_range_join.cpp @@ -102,6 +102,10 @@ class RangeJoinMergeTask : public ExecutorTask { return TaskExecutionResult::TASK_FINISHED; } + string TaskType() const override { + return "RangeJoinMergeTask"; + } + private: ClientContext &context; GlobalSortedTable &table; diff --git a/src/duckdb/src/execution/operator/order/physical_order.cpp b/src/duckdb/src/execution/operator/order/physical_order.cpp index c3c043466..be861d9dd 100644 --- a/src/duckdb/src/execution/operator/order/physical_order.cpp +++ b/src/duckdb/src/execution/operator/order/physical_order.cpp @@ -127,6 +127,10 @@ class PhysicalOrderMergeTask : public ExecutorTask { return TaskExecutionResult::TASK_FINISHED; } + string TaskType() const override { + return "PhysicalOrderMergeTask"; + } + private: ClientContext &context; OrderGlobalSinkState &state; diff --git a/src/duckdb/src/execution/operator/persistent/physical_batch_copy_to_file.cpp b/src/duckdb/src/execution/operator/persistent/physical_batch_copy_to_file.cpp index 86560e256..69ba3d252 100644 --- a/src/duckdb/src/execution/operator/persistent/physical_batch_copy_to_file.cpp +++ b/src/duckdb/src/execution/operator/persistent/physical_batch_copy_to_file.cpp @@ -265,6 +265,10 @@ class ProcessRemainingBatchesTask : public ExecutorTask { return TaskExecutionResult::TASK_FINISHED; } + string TaskType() const override { + return "ProcessRemainingBatchesTask"; + } + private: const PhysicalBatchCopyToFile &op; FixedBatchCopyGlobalState &gstate; diff --git a/src/duckdb/src/execution/physical_plan/plan_comparison_join.cpp b/src/duckdb/src/execution/physical_plan/plan_comparison_join.cpp index b6f4125ef..d3ad37204 100644 --- a/src/duckdb/src/execution/physical_plan/plan_comparison_join.cpp +++ b/src/duckdb/src/execution/physical_plan/plan_comparison_join.cpp @@ -84,7 +84,8 @@ PhysicalOperator &PhysicalPlanGenerator::PlanComparisonJoin(LogicalComparisonJoi } if (can_iejoin) { - return Make(op, left, right, std::move(op.conditions), op.join_type, op.estimated_cardinality); + return Make(op, left, right, std::move(op.conditions), op.join_type, op.estimated_cardinality, + std::move(op.filter_pushdown)); } if (can_merge) { // range join: use piecewise merge join diff --git a/src/duckdb/src/function/table/version/pragma_version.cpp b/src/duckdb/src/function/table/version/pragma_version.cpp index 8f8a89f7e..56057312c 100644 --- a/src/duckdb/src/function/table/version/pragma_version.cpp +++ b/src/duckdb/src/function/table/version/pragma_version.cpp @@ -1,5 +1,5 @@ #ifndef DUCKDB_PATCH_VERSION -#define DUCKDB_PATCH_VERSION "0-dev3269" +#define DUCKDB_PATCH_VERSION "0-dev3294" #endif #ifndef DUCKDB_MINOR_VERSION #define DUCKDB_MINOR_VERSION 3 @@ -8,10 +8,10 @@ #define DUCKDB_MAJOR_VERSION 1 #endif #ifndef DUCKDB_VERSION -#define DUCKDB_VERSION "v1.3.0-dev3269" +#define DUCKDB_VERSION "v1.3.0-dev3294" #endif #ifndef DUCKDB_SOURCE_ID -#define DUCKDB_SOURCE_ID "e144689926" +#define DUCKDB_SOURCE_ID "30e907b52f" #endif #include "duckdb/function/table/system_functions.hpp" #include "duckdb/main/database.hpp" diff --git a/src/duckdb/src/include/duckdb/common/arrow/arrow_merge_event.hpp b/src/duckdb/src/include/duckdb/common/arrow/arrow_merge_event.hpp index d6608fd7c..eefc58e4e 100644 --- a/src/duckdb/src/include/duckdb/common/arrow/arrow_merge_event.hpp +++ b/src/duckdb/src/include/duckdb/common/arrow/arrow_merge_event.hpp @@ -34,6 +34,10 @@ class ArrowBatchTask : public ExecutorTask { void ProduceRecordBatches(); TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override; + string TaskType() const override { + return "ArrowBatchTask"; + } + private: ArrowQueryResult &result; vector record_batch_indices; diff --git a/src/duckdb/src/include/duckdb/common/enum_util.hpp b/src/duckdb/src/include/duckdb/common/enum_util.hpp index 5dd4c01ff..12faadc82 100644 --- a/src/duckdb/src/include/duckdb/common/enum_util.hpp +++ b/src/duckdb/src/include/duckdb/common/enum_util.hpp @@ -196,6 +196,8 @@ enum class GateStatus : uint8_t; enum class HLLStorageType : uint8_t; +enum class HTTPStatusCode : uint16_t; + enum class IndexAppendMode : uint8_t; enum class IndexConstraintType : uint8_t; @@ -655,6 +657,9 @@ const char* EnumUtil::ToChars(GateStatus value); template<> const char* EnumUtil::ToChars(HLLStorageType value); +template<> +const char* EnumUtil::ToChars(HTTPStatusCode value); + template<> const char* EnumUtil::ToChars(IndexAppendMode value); @@ -1220,6 +1225,9 @@ GateStatus EnumUtil::FromString(const char *value); template<> HLLStorageType EnumUtil::FromString(const char *value); +template<> +HTTPStatusCode EnumUtil::FromString(const char *value); + template<> IndexAppendMode EnumUtil::FromString(const char *value); diff --git a/src/duckdb/src/include/duckdb/common/exception/http_exception.hpp b/src/duckdb/src/include/duckdb/common/exception/http_exception.hpp index 87d11d139..aff00d23d 100644 --- a/src/duckdb/src/include/duckdb/common/exception/http_exception.hpp +++ b/src/duckdb/src/include/duckdb/common/exception/http_exception.hpp @@ -25,7 +25,8 @@ class HTTPException : public Exception { template ::status = 0, typename... ARGS> explicit HTTPException(RESPONSE &response, const string &msg, ARGS... params) - : HTTPException(response.status, response.body, response.headers, response.reason, msg, params...) { + : HTTPException(static_cast(response.status), response.body, response.headers, response.reason, msg, + params...) { } template @@ -35,7 +36,8 @@ class HTTPException : public Exception { template ::code = 0, typename... ARGS> explicit HTTPException(RESPONSE &response, const string &msg, ARGS... params) - : HTTPException(response.code, response.body, response.headers, response.error, msg, params...) { + : HTTPException(static_cast(response.code), response.body, response.headers, response.error, msg, + params...) { } template diff --git a/src/duckdb/src/include/duckdb/common/http_util.hpp b/src/duckdb/src/include/duckdb/common/http_util.hpp index 766534c30..f2fed4464 100644 --- a/src/duckdb/src/include/duckdb/common/http_util.hpp +++ b/src/duckdb/src/include/duckdb/common/http_util.hpp @@ -9,11 +9,153 @@ #pragma once #include "duckdb/common/types.hpp" +#include "duckdb/common/map.hpp" namespace duckdb { +class DatabaseInstance; +class HTTPLogger; + +enum class HTTPStatusCode : uint16_t { + INVALID = 0, + // Information responses + Continue_100 = 100, + SwitchingProtocol_101 = 101, + Processing_102 = 102, + EarlyHints_103 = 103, + + // Successful responses + OK_200 = 200, + Created_201 = 201, + Accepted_202 = 202, + NonAuthoritativeInformation_203 = 203, + NoContent_204 = 204, + ResetContent_205 = 205, + PartialContent_206 = 206, + MultiStatus_207 = 207, + AlreadyReported_208 = 208, + IMUsed_226 = 226, + + // Redirection messages + MultipleChoices_300 = 300, + MovedPermanently_301 = 301, + Found_302 = 302, + SeeOther_303 = 303, + NotModified_304 = 304, + UseProxy_305 = 305, + unused_306 = 306, + TemporaryRedirect_307 = 307, + PermanentRedirect_308 = 308, + + // Client error responses + BadRequest_400 = 400, + Unauthorized_401 = 401, + PaymentRequired_402 = 402, + Forbidden_403 = 403, + NotFound_404 = 404, + MethodNotAllowed_405 = 405, + NotAcceptable_406 = 406, + ProxyAuthenticationRequired_407 = 407, + RequestTimeout_408 = 408, + Conflict_409 = 409, + Gone_410 = 410, + LengthRequired_411 = 411, + PreconditionFailed_412 = 412, + PayloadTooLarge_413 = 413, + UriTooLong_414 = 414, + UnsupportedMediaType_415 = 415, + RangeNotSatisfiable_416 = 416, + ExpectationFailed_417 = 417, + ImATeapot_418 = 418, + MisdirectedRequest_421 = 421, + UnprocessableContent_422 = 422, + Locked_423 = 423, + FailedDependency_424 = 424, + TooEarly_425 = 425, + UpgradeRequired_426 = 426, + PreconditionRequired_428 = 428, + TooManyRequests_429 = 429, + RequestHeaderFieldsTooLarge_431 = 431, + UnavailableForLegalReasons_451 = 451, + + // Server error responses + InternalServerError_500 = 500, + NotImplemented_501 = 501, + BadGateway_502 = 502, + ServiceUnavailable_503 = 503, + GatewayTimeout_504 = 504, + HttpVersionNotSupported_505 = 505, + VariantAlsoNegotiates_506 = 506, + InsufficientStorage_507 = 507, + LoopDetected_508 = 508, + NotExtended_510 = 510, + NetworkAuthenticationRequired_511 = 511, +}; + +struct HTTPHeaders { + using header_map_t = std::multimap; + +public: + HTTPHeaders() = default; + explicit HTTPHeaders(DatabaseInstance &db); + + void Insert(string key, string value); + bool HasHeader(const string &key) const; + string GetHeaderValue(const string &key) const; + + header_map_t::iterator begin() { // NOLINT: match stl API + return headers.begin(); + } + header_map_t::iterator end() { // NOLINT: match stl API + return headers.end(); + } + header_map_t::const_iterator begin() const { // NOLINT: match stl API + return headers.begin(); + } + header_map_t::const_iterator end() const { // NOLINT: match stl API + return headers.end(); + } + header_map_t::const_iterator cbegin() const { // NOLINT: match stl API + return headers.begin(); + } + header_map_t::const_iterator cend() const { // NOLINT: match stl API + return headers.end(); + } + +private: + header_map_t headers; +}; + +struct HTTPResponse { + explicit HTTPResponse(HTTPStatusCode code); + + HTTPStatusCode status; + string body; + string request_error; + string reason; + HTTPHeaders headers; + bool success = true; + +public: + bool HasHeader(const string &key) const; + string GetHeaderValue(const string &key) const; + + bool Success() const; + + bool HasRequestError() const; + const string &GetRequestError() const; +}; class HTTPUtil { public: + virtual ~HTTPUtil() = default; + +public: + static HTTPUtil &Get(DatabaseInstance &db); + + virtual unique_ptr Request(DatabaseInstance &db, const string &url, const HTTPHeaders &headers, + optional_ptr http_logger); + static void ParseHTTPProxyHost(string &proxy_value, string &hostname_out, idx_t &port_out, idx_t default_port = 80); + static HTTPStatusCode ToStatusCode(int32_t status_code); }; } // namespace duckdb diff --git a/src/duckdb/src/include/duckdb/common/multi_file/union_by_name.hpp b/src/duckdb/src/include/duckdb/common/multi_file/union_by_name.hpp index ac43e59e0..e6b6c7d6e 100644 --- a/src/duckdb/src/include/duckdb/common/multi_file/union_by_name.hpp +++ b/src/duckdb/src/include/duckdb/common/multi_file/union_by_name.hpp @@ -32,6 +32,10 @@ class UnionByReaderTask : public BaseExecutorTask { readers[file_idx] = OP::GetUnionData(std::move(reader), file_idx); } + string TaskType() const override { + return "UnionByReaderTask"; + } + private: ClientContext &context; const OpenFileInfo &file; diff --git a/src/duckdb/src/include/duckdb/execution/operator/join/physical_iejoin.hpp b/src/duckdb/src/include/duckdb/execution/operator/join/physical_iejoin.hpp index b2c2f7a4a..545a6f49c 100644 --- a/src/duckdb/src/include/duckdb/execution/operator/join/physical_iejoin.hpp +++ b/src/duckdb/src/include/duckdb/execution/operator/join/physical_iejoin.hpp @@ -20,6 +20,9 @@ class PhysicalIEJoin : public PhysicalRangeJoin { static constexpr const PhysicalOperatorType TYPE = PhysicalOperatorType::IE_JOIN; public: + PhysicalIEJoin(LogicalComparisonJoin &op, PhysicalOperator &left, PhysicalOperator &right, + vector cond, JoinType join_type, idx_t estimated_cardinality, + unique_ptr pushdown_info); PhysicalIEJoin(LogicalComparisonJoin &op, PhysicalOperator &left, PhysicalOperator &right, vector cond, JoinType join_type, idx_t estimated_cardinality); diff --git a/src/duckdb/src/include/duckdb/main/config.hpp b/src/duckdb/src/include/duckdb/main/config.hpp index 6fd9d6786..523fba639 100644 --- a/src/duckdb/src/include/duckdb/main/config.hpp +++ b/src/duckdb/src/include/duckdb/main/config.hpp @@ -55,6 +55,7 @@ class ExtensionCallback; class SecretManager; class CompressionInfo; class EncryptionUtil; +class HTTPUtil; struct CompressionFunctionSet; struct DatabaseCacheEntry; @@ -349,6 +350,8 @@ struct DBConfig { vector> extension_callbacks; //! Encryption Util for OpenSSL shared_ptr encryption_util; + //! HTTP Request utility functions + unique_ptr http_util; //! Reference to the database cache entry (if any) shared_ptr db_cache_entry; diff --git a/src/duckdb/src/include/duckdb/main/database_manager.hpp b/src/duckdb/src/include/duckdb/main/database_manager.hpp index dec1c89f0..9b18c5d18 100644 --- a/src/duckdb/src/include/duckdb/main/database_manager.hpp +++ b/src/duckdb/src/include/duckdb/main/database_manager.hpp @@ -68,6 +68,8 @@ class DatabaseManager { //! Scans the catalog set and adds each committed database entry, and each database entry of the current //! transaction, to a vector holding AttachedDatabase references vector> GetDatabases(ClientContext &context); + //! Scans the catalog set and returns each committed database entry + vector> GetDatabases(); //! Removes all databases from the catalog set. This is necessary for the database instance's destructor, //! as the database manager has to be alive when destroying the catalog set objects. void ResetDatabases(unique_ptr &scheduler); diff --git a/src/duckdb/src/include/duckdb/main/extension/generated_extension_loader.hpp b/src/duckdb/src/include/duckdb/main/extension/generated_extension_loader.hpp index eb98d38cb..70bbf455f 100644 --- a/src/duckdb/src/include/duckdb/main/extension/generated_extension_loader.hpp +++ b/src/duckdb/src/include/duckdb/main/extension/generated_extension_loader.hpp @@ -21,8 +21,8 @@ namespace duckdb { //! Looks through the CMake-generated list of extensions that are linked into DuckDB currently to try load bool TryLoadLinkedExtension(DuckDB &db, const string &extension); -const vector &LinkedExtensions(); -const vector &LoadedExtensionTestPaths(); +vector LinkedExtensions(); +vector LoadedExtensionTestPaths(); } // namespace duckdb #endif diff --git a/src/duckdb/src/include/duckdb/parallel/pipeline.hpp b/src/duckdb/src/include/duckdb/parallel/pipeline.hpp index dd1614145..b6b20d7db 100644 --- a/src/duckdb/src/include/duckdb/parallel/pipeline.hpp +++ b/src/duckdb/src/include/duckdb/parallel/pipeline.hpp @@ -33,6 +33,10 @@ class PipelineTask : public ExecutorTask { Pipeline &pipeline; unique_ptr pipeline_executor; + string TaskType() const override { + return "PipelineTask"; + } + public: const PipelineExecutor &GetPipelineExecutor() const; bool TaskBlockedOnResult() const override; diff --git a/src/duckdb/src/include/duckdb/parallel/task.hpp b/src/duckdb/src/include/duckdb/parallel/task.hpp index a18b72e2f..1beb11437 100644 --- a/src/duckdb/src/include/duckdb/parallel/task.hpp +++ b/src/duckdb/src/include/duckdb/parallel/task.hpp @@ -52,6 +52,10 @@ class Task : public enable_shared_from_this { return false; } + virtual string TaskType() const { + return "UnnamedTask"; + } + public: optional_ptr token; }; diff --git a/src/duckdb/src/main/database.cpp b/src/duckdb/src/main/database.cpp index dc05baffa..7e0e52d4e 100644 --- a/src/duckdb/src/main/database.cpp +++ b/src/duckdb/src/main/database.cpp @@ -29,6 +29,7 @@ #include "duckdb/storage/external_file_cache.hpp" #include "duckdb/storage/compression/empty_validity.hpp" #include "duckdb/logging/logger.hpp" +#include "duckdb/common/http_util.hpp" #ifndef DUCKDB_NO_THREADS #include "duckdb/common/thread.hpp" @@ -47,6 +48,7 @@ DBConfig::DBConfig() { index_types = make_uniq(); error_manager = make_uniq(); secret_manager = make_uniq(); + http_util = make_uniq(); } DBConfig::DBConfig(bool read_only) : DBConfig::DBConfig() { diff --git a/src/duckdb/src/main/database_manager.cpp b/src/duckdb/src/main/database_manager.cpp index 0804fc5ee..c1ac5c4fa 100644 --- a/src/duckdb/src/main/database_manager.cpp +++ b/src/duckdb/src/main/database_manager.cpp @@ -230,6 +230,13 @@ vector> DatabaseManager::GetDatabases(ClientContext return result; } +vector> DatabaseManager::GetDatabases() { + vector> result; + databases->Scan([&](CatalogEntry &entry) { result.push_back(entry.Cast()); }); + result.push_back(*system); + return result; +} + void DatabaseManager::ResetDatabases(unique_ptr &scheduler) { vector> result; databases->Scan([&](CatalogEntry &entry) { result.push_back(entry.Cast()); }); diff --git a/src/duckdb/src/main/extension/extension_install.cpp b/src/duckdb/src/main/extension/extension_install.cpp index fea10d0b4..8e4907bb3 100644 --- a/src/duckdb/src/main/extension/extension_install.cpp +++ b/src/duckdb/src/main/extension/extension_install.cpp @@ -12,15 +12,6 @@ #include "duckdb/main/secret/secret.hpp" #include "duckdb/main/secret/secret_manager.hpp" -#ifndef DISABLE_DUCKDB_REMOTE_INSTALL -#ifndef DUCKDB_DISABLE_EXTENSION_LOAD -#include "httplib.hpp" -#ifndef DUCKDB_NO_THREADS -#include -#include -#endif -#endif -#endif #include "duckdb/common/windows_undefs.hpp" #include @@ -354,17 +345,6 @@ static unique_ptr InstallFromHttpUrl(DatabaseInstance &db, const string &local_extension_path, ExtensionInstallOptions &options, optional_ptr http_logger) { - string no_http = StringUtil::Replace(url, "http://", ""); - - idx_t next = no_http.find('/', 0); - if (next == string::npos) { - throw IOException("No slash in URL template"); - } - - // Push the substring [last, next) on to splits - auto hostname_without_http = no_http.substr(0, next); - auto url_local_part = no_http.substr(next); - unique_ptr install_info; { auto fs = FileSystem::CreateLocal(); @@ -381,98 +361,41 @@ static unique_ptr InstallFromHttpUrl(DatabaseInstance &db, } } - auto url_base = "http://" + hostname_without_http; - // FIXME: the retry logic should be unified with the retry logic in the httpfs client - static constexpr idx_t MAX_RETRY_COUNT = 3; - static constexpr uint64_t RETRY_WAIT_MS = 100; - static constexpr double RETRY_BACKOFF = 4; - idx_t retry_count = 0; - duckdb_httplib::Result res; - while (true) { - duckdb_httplib::Client cli(url_base.c_str()); - if (!db.config.options.http_proxy.empty()) { - idx_t port; - string host; - HTTPUtil::ParseHTTPProxyHost(db.config.options.http_proxy, host, port); - cli.set_proxy(host, NumericCast(port)); - } - - if (!db.config.options.http_proxy_username.empty() || !db.config.options.http_proxy_password.empty()) { - cli.set_proxy_basic_auth(db.config.options.http_proxy_username, db.config.options.http_proxy_password); - } - - if (http_logger) { - cli.set_logger(http_logger->GetLogger()); - } - - duckdb_httplib::Headers headers = { - {"User-Agent", StringUtil::Format("%s %s", db.config.UserAgent(), DuckDB::SourceID())}}; - - if (options.use_etags && install_info && !install_info->etag.empty()) { - headers.insert({"If-None-Match", StringUtil::Format("%s", install_info->etag)}); - } - - res = cli.Get(url_local_part.c_str(), headers); - if (install_info && res && res->status == 304) { - return install_info; - } + HTTPHeaders headers(db); + if (options.use_etags && install_info && !install_info->etag.empty()) { + headers.Insert("If-None-Match", StringUtil::Format("%s", install_info->etag)); + } + auto &http_util = HTTPUtil::Get(db); + auto response = http_util.Request(db, url, headers, http_logger); + if (!response->Success()) { + // if we should not retry or exceeded the number of retries - bubble up the error + string message; + ExtensionHelper::CreateSuggestions(extension_name, message); - if (res && res->status == 200) { - // success! - break; + auto documentation_link = ExtensionHelper::ExtensionInstallDocumentationLink(extension_name); + if (!documentation_link.empty()) { + message += "\nFor more info, visit " + documentation_link; } - // failure - check if we should retry - bool should_retry = false; - if (res.error() == duckdb_httplib::Error::Success) { - switch (res->status) { - case 408: // Request Timeout - case 418: // Server is pretending to be a teapot - case 429: // Rate limiter hit - case 500: // Server has error - case 503: // Server has error - case 504: // Server has error - should_retry = true; - break; - default: - break; - } - } else { - // always retry on duckdb_httplib::Error::Error - should_retry = true; + if (response->HasRequestError()) { + // request error - this means something went wrong performing the request + throw IOException("Failed to download extension \"%s\" at URL \"%s\"\n%s (ERROR %s)", extension_name, url, + message, response->GetRequestError()); } - retry_count++; - if (!should_retry || retry_count >= MAX_RETRY_COUNT) { - // if we should not retry or exceeded the number of retries - bubble up the error - string message; - ExtensionHelper::CreateSuggestions(extension_name, message); - - auto documentation_link = ExtensionHelper::ExtensionInstallDocumentationLink(extension_name); - if (!documentation_link.empty()) { - message += "\nFor more info, visit " + documentation_link; - } - if (res.error() == duckdb_httplib::Error::Success) { - throw HTTPException(res.value(), "Failed to download extension \"%s\" at URL \"%s%s\" (HTTP %n)\n%s", - extension_name, url_base, url_local_part, res->status, message); - } else { - throw IOException("Failed to download extension \"%s\" at URL \"%s%s\"\n%s (ERROR %s)", extension_name, - url_base, url_local_part, message, to_string(res.error())); - } - } -#ifndef DUCKDB_NO_THREADS - // retry - // sleep first - uint64_t sleep_amount = static_cast(static_cast(RETRY_WAIT_MS) * - pow(RETRY_BACKOFF, static_cast(retry_count) - 1)); - std::this_thread::sleep_for(std::chrono::milliseconds(sleep_amount)); -#endif + // if this was not a request error this means the server responded - report the response status and response + throw HTTPException(*response, "Failed to download extension \"%s\" at URL \"%s\" (HTTP %n)\n%s", + extension_name, url, int(response->status), message); + } + if (response->status == HTTPStatusCode::NotModified_304 && install_info) { + return install_info; } - auto decompressed_body = GZipFileSystem::UncompressGZIPString(res->body); + + auto decompressed_body = GZipFileSystem::UncompressGZIPString(response->body); ExtensionInstallInfo info; CheckExtensionMetadataOnInstall(db, (void *)decompressed_body.data(), decompressed_body.size(), info, extension_name); - if (res->has_header("ETag")) { - info.etag = res->get_header_value("ETag"); + if (response->HasHeader("ETag")) { + info.etag = response->GetHeaderValue("ETag"); } if (options.repository) { diff --git a/src/duckdb/src/main/http/http_util.cpp b/src/duckdb/src/main/http/http_util.cpp new file mode 100644 index 000000000..e01adb3c5 --- /dev/null +++ b/src/duckdb/src/main/http/http_util.cpp @@ -0,0 +1,322 @@ +#include "duckdb/common/http_util.hpp" +#include "duckdb/main/database.hpp" +#include "duckdb/common/operator/cast_operators.hpp" +#include "duckdb/common/string_util.hpp" +#include "duckdb/logging/http_logger.hpp" +#ifndef DISABLE_DUCKDB_REMOTE_INSTALL +#ifndef DUCKDB_DISABLE_EXTENSION_LOAD +#include "httplib.hpp" +#endif +#endif +#ifndef DUCKDB_NO_THREADS +#include +#include +#endif + +namespace duckdb { + +HTTPHeaders::HTTPHeaders(DatabaseInstance &db) { + headers.insert({"User-Agent", StringUtil::Format("%s %s", db.config.UserAgent(), DuckDB::SourceID())}); +} + +void HTTPHeaders::Insert(string key, string value) { + headers.insert(make_pair(std::move(key), std::move(value))); +} + +bool HTTPHeaders::HasHeader(const string &key) const { + return headers.find(key) != headers.end(); +} + +string HTTPHeaders::GetHeaderValue(const string &key) const { + auto entry = headers.find(key); + if (entry == headers.end()) { + throw InternalException("Header value not found"); + } + return entry->second; +} + +HTTPStatusCode HTTPUtil::ToStatusCode(int32_t status_code) { + switch (status_code) { + case 100: + return HTTPStatusCode::Continue_100; + case 101: + return HTTPStatusCode::SwitchingProtocol_101; + case 102: + return HTTPStatusCode::Processing_102; + case 103: + return HTTPStatusCode::EarlyHints_103; + case 200: + return HTTPStatusCode::OK_200; + case 201: + return HTTPStatusCode::Created_201; + case 202: + return HTTPStatusCode::Accepted_202; + case 203: + return HTTPStatusCode::NonAuthoritativeInformation_203; + case 204: + return HTTPStatusCode::NoContent_204; + case 205: + return HTTPStatusCode::ResetContent_205; + case 206: + return HTTPStatusCode::PartialContent_206; + case 207: + return HTTPStatusCode::MultiStatus_207; + case 208: + return HTTPStatusCode::AlreadyReported_208; + case 226: + return HTTPStatusCode::IMUsed_226; + case 300: + return HTTPStatusCode::MultipleChoices_300; + case 301: + return HTTPStatusCode::MovedPermanently_301; + case 302: + return HTTPStatusCode::Found_302; + case 303: + return HTTPStatusCode::SeeOther_303; + case 304: + return HTTPStatusCode::NotModified_304; + case 305: + return HTTPStatusCode::UseProxy_305; + case 306: + return HTTPStatusCode::unused_306; + case 307: + return HTTPStatusCode::TemporaryRedirect_307; + case 308: + return HTTPStatusCode::PermanentRedirect_308; + case 400: + return HTTPStatusCode::BadRequest_400; + case 401: + return HTTPStatusCode::Unauthorized_401; + case 402: + return HTTPStatusCode::PaymentRequired_402; + case 403: + return HTTPStatusCode::Forbidden_403; + case 404: + return HTTPStatusCode::NotFound_404; + case 405: + return HTTPStatusCode::MethodNotAllowed_405; + case 406: + return HTTPStatusCode::NotAcceptable_406; + case 407: + return HTTPStatusCode::ProxyAuthenticationRequired_407; + case 408: + return HTTPStatusCode::RequestTimeout_408; + case 409: + return HTTPStatusCode::Conflict_409; + case 410: + return HTTPStatusCode::Gone_410; + case 411: + return HTTPStatusCode::LengthRequired_411; + case 412: + return HTTPStatusCode::PreconditionFailed_412; + case 413: + return HTTPStatusCode::PayloadTooLarge_413; + case 414: + return HTTPStatusCode::UriTooLong_414; + case 415: + return HTTPStatusCode::UnsupportedMediaType_415; + case 416: + return HTTPStatusCode::RangeNotSatisfiable_416; + case 417: + return HTTPStatusCode::ExpectationFailed_417; + case 418: + return HTTPStatusCode::ImATeapot_418; + case 421: + return HTTPStatusCode::MisdirectedRequest_421; + case 422: + return HTTPStatusCode::UnprocessableContent_422; + case 423: + return HTTPStatusCode::Locked_423; + case 424: + return HTTPStatusCode::FailedDependency_424; + case 425: + return HTTPStatusCode::TooEarly_425; + case 426: + return HTTPStatusCode::UpgradeRequired_426; + case 428: + return HTTPStatusCode::PreconditionRequired_428; + case 429: + return HTTPStatusCode::TooManyRequests_429; + case 431: + return HTTPStatusCode::RequestHeaderFieldsTooLarge_431; + case 451: + return HTTPStatusCode::UnavailableForLegalReasons_451; + case 500: + return HTTPStatusCode::InternalServerError_500; + case 501: + return HTTPStatusCode::NotImplemented_501; + case 502: + return HTTPStatusCode::BadGateway_502; + case 503: + return HTTPStatusCode::ServiceUnavailable_503; + case 504: + return HTTPStatusCode::GatewayTimeout_504; + case 505: + return HTTPStatusCode::HttpVersionNotSupported_505; + case 506: + return HTTPStatusCode::VariantAlsoNegotiates_506; + case 507: + return HTTPStatusCode::InsufficientStorage_507; + case 508: + return HTTPStatusCode::LoopDetected_508; + case 510: + return HTTPStatusCode::NotExtended_510; + case 511: + return HTTPStatusCode::NetworkAuthenticationRequired_511; + default: + return HTTPStatusCode::INVALID; + } +} + +unique_ptr TransformResponse(duckdb_httplib::Result &res) { + auto status_code = HTTPUtil::ToStatusCode(res ? res->status : 0); + auto result = make_uniq(status_code); + if (res.error() == duckdb_httplib::Error::Success) { + auto &response = res.value(); + result->body = response.body; + result->reason = response.reason; + for (auto &entry : response.headers) { + result->headers.Insert(entry.first, entry.second); + } + } else { + result->request_error = to_string(res.error()); + } + return result; +} + +HTTPResponse::HTTPResponse(HTTPStatusCode code) : status(code) { +} + +bool HTTPResponse::HasHeader(const string &key) const { + return headers.HasHeader(key); +} + +string HTTPResponse::GetHeaderValue(const string &key) const { + return headers.GetHeaderValue(key); +} + +bool HTTPResponse::Success() const { + return success; +} + +bool HTTPResponse::HasRequestError() const { + return !request_error.empty(); +} + +const string &HTTPResponse::GetRequestError() const { + return request_error; +} + +HTTPUtil &HTTPUtil::Get(DatabaseInstance &db) { + return *db.config.http_util; +} + +unique_ptr HTTPUtil::Request(DatabaseInstance &db, const string &url, const HTTPHeaders &headers, + optional_ptr http_logger) { + string no_http = StringUtil::Replace(url, "http://", ""); + + idx_t next = no_http.find('/', 0); + if (next == string::npos) { + throw IOException("No slash in URL template"); + } + + // Push the substring [last, next) on to splits + auto hostname_without_http = no_http.substr(0, next); + auto url_local_part = no_http.substr(next); + + auto url_base = "http://" + hostname_without_http; + + duckdb_httplib::Headers httplib_headers; + for (auto &header : headers) { + httplib_headers.insert({header.first, header.second}); + } + + // FIXME: the retry logic should be unified with the retry logic in the httpfs client + static constexpr idx_t MAX_RETRY_COUNT = 3; + static constexpr uint64_t RETRY_WAIT_MS = 100; + static constexpr double RETRY_BACKOFF = 4; + idx_t retry_count = 0; + duckdb_httplib::Result res; + while (true) { + duckdb_httplib::Client cli(url_base.c_str()); + if (!db.config.options.http_proxy.empty()) { + idx_t port; + string host; + HTTPUtil::ParseHTTPProxyHost(db.config.options.http_proxy, host, port); + cli.set_proxy(host, NumericCast(port)); + } + + if (!db.config.options.http_proxy_username.empty() || !db.config.options.http_proxy_password.empty()) { + cli.set_proxy_basic_auth(db.config.options.http_proxy_username, db.config.options.http_proxy_password); + } + + if (http_logger) { + cli.set_logger(http_logger->GetLogger()); + } + + res = cli.Get(url_local_part.c_str(), httplib_headers); + if (res && res->status == 304) { + return make_uniq(HTTPStatusCode::NotModified_304); + } + if (res && res->status == 200) { + // success! + return TransformResponse(res); + } + // failure - check if we should retry + bool should_retry = false; + if (res.error() == duckdb_httplib::Error::Success) { + switch (res->status) { + case 408: // Request Timeout + case 418: // Server is pretending to be a teapot + case 429: // Rate limiter hit + case 500: // Server has error + case 503: // Server has error + case 504: // Server has error + should_retry = true; + break; + default: + break; + } + } else { + // always retry on duckdb_httplib::Error::Error + should_retry = true; + } + retry_count++; + if (!should_retry || retry_count >= MAX_RETRY_COUNT) { + // if we should not retry or exceeded the number of retries - bubble up the error + auto result = TransformResponse(res); + result->success = false; + return result; + } +#ifndef DUCKDB_NO_THREADS + // retry + // sleep first + uint64_t sleep_amount = static_cast(static_cast(RETRY_WAIT_MS) * + pow(RETRY_BACKOFF, static_cast(retry_count) - 1)); + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_amount)); +#endif + } +} + +void HTTPUtil::ParseHTTPProxyHost(string &proxy_value, string &hostname_out, idx_t &port_out, idx_t default_port) { + auto sanitized_proxy_value = proxy_value; + if (StringUtil::StartsWith(proxy_value, "http://")) { + sanitized_proxy_value = proxy_value.substr(7); + } + auto proxy_split = StringUtil::Split(sanitized_proxy_value, ":"); + if (proxy_split.size() == 1) { + hostname_out = proxy_split[0]; + port_out = default_port; + } else if (proxy_split.size() == 2) { + idx_t port; + if (!TryCast::Operation(proxy_split[1], port, false)) { + throw InvalidInputException("Failed to parse port from http_proxy '%s'", proxy_value); + } + hostname_out = proxy_split[0]; + port_out = port; + } else { + throw InvalidInputException("Failed to parse http_proxy '%s' into a host and port", proxy_value); + } +} + +} // namespace duckdb diff --git a/src/duckdb/src/parallel/pipeline_finish_event.cpp b/src/duckdb/src/parallel/pipeline_finish_event.cpp index b933e7ead..2dfc3a7ab 100644 --- a/src/duckdb/src/parallel/pipeline_finish_event.cpp +++ b/src/duckdb/src/parallel/pipeline_finish_event.cpp @@ -46,6 +46,10 @@ class PipelineFinishTask : public ExecutorTask { return TaskExecutionResult::TASK_FINISHED; } + string TaskType() const override { + return "PipelineFinishTask"; + } + private: #ifdef DUCKDB_DEBUG_ASYNC_SINK_SOURCE //! Debugging state: number of times blocked diff --git a/src/duckdb/src/parallel/pipeline_initialize_event.cpp b/src/duckdb/src/parallel/pipeline_initialize_event.cpp index 2d9e460d5..1d584939a 100644 --- a/src/duckdb/src/parallel/pipeline_initialize_event.cpp +++ b/src/duckdb/src/parallel/pipeline_initialize_event.cpp @@ -22,6 +22,10 @@ class PipelineInitializeTask : public ExecutorTask { event->FinishTask(); return TaskExecutionResult::TASK_FINISHED; } + + string TaskType() const override { + return "PipelineInitializeTask"; + } }; void PipelineInitializeEvent::Schedule() { diff --git a/src/duckdb/src/parallel/pipeline_prepare_finish_event.cpp b/src/duckdb/src/parallel/pipeline_prepare_finish_event.cpp index 2cca2d11e..10154e27f 100644 --- a/src/duckdb/src/parallel/pipeline_prepare_finish_event.cpp +++ b/src/duckdb/src/parallel/pipeline_prepare_finish_event.cpp @@ -20,6 +20,10 @@ class PipelinePreFinishTask : public ExecutorTask { event->FinishTask(); return TaskExecutionResult::TASK_FINISHED; } + + string TaskType() const override { + return "PipelinePreFinishTask"; + } }; void PipelinePrepareFinishEvent::Schedule() { diff --git a/src/duckdb/src/storage/table/row_group_collection.cpp b/src/duckdb/src/storage/table/row_group_collection.cpp index bdd6fe486..d2b21de03 100644 --- a/src/duckdb/src/storage/table/row_group_collection.cpp +++ b/src/duckdb/src/storage/table/row_group_collection.cpp @@ -787,6 +787,10 @@ class CheckpointTask : public BaseCheckpointTask { checkpoint_state.write_data[index] = row_group.WriteToDisk(*checkpoint_state.writers[index]); } + string TaskType() const override { + return "CheckpointTask"; + } + private: idx_t index; }; @@ -908,6 +912,10 @@ class VacuumTask : public BaseCheckpointTask { } } + string TaskType() const override { + return "VacuumTask"; + } + private: VacuumState &vacuum_state; idx_t segment_idx; diff --git a/src/duckdb/ub_src_common.cpp b/src/duckdb/ub_src_common.cpp index bf89076d5..16703bf71 100644 --- a/src/duckdb/ub_src_common.cpp +++ b/src/duckdb/ub_src_common.cpp @@ -36,8 +36,6 @@ #include "src/common/hive_partitioning.cpp" -#include "src/common/http_util.cpp" - #include "src/common/pipe_file_system.cpp" #include "src/common/local_file_system.cpp" diff --git a/src/duckdb/ub_src_main_http.cpp b/src/duckdb/ub_src_main_http.cpp new file mode 100644 index 000000000..e21e01e04 --- /dev/null +++ b/src/duckdb/ub_src_main_http.cpp @@ -0,0 +1,2 @@ +#include "src/main/http/http_util.cpp" +