Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/duckdb/extension/icu/icu-strptime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,9 @@ struct ICUStrptime : public ICUDateFunc {
if (!error.empty()) {
throw InvalidInputException("Failed to parse format specifier %s: %s", format_string, error);
}
// If any format has UTC offsets, then we have to produce TSTZ
// If any format has UTC offsets or names, then we have to produce TSTZ
has_tz = has_tz || format.HasFormatSpecifier(StrTimeSpecifier::TZ_NAME);
has_tz = has_tz || format.HasFormatSpecifier(StrTimeSpecifier::UTC_OFFSET);
formats.emplace_back(format);
}
if (has_tz) {
Expand Down
919 changes: 467 additions & 452 deletions src/duckdb/extension/parquet/parquet_metadata.cpp

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions src/duckdb/src/common/string_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,15 @@ static unique_ptr<ComplexJSON> ParseJSON(const string &json, yyjson_doc *doc, yy
const bool bool_val = yyjson_get_bool(root);
return make_uniq<ComplexJSON>(bool_val ? "true" : "false");
}
case YYJSON_TYPE_NUM | YYJSON_SUBTYPE_UINT:
return make_uniq<ComplexJSON>(to_string(unsafe_yyjson_get_uint(root)));
case YYJSON_TYPE_NUM | YYJSON_SUBTYPE_SINT:
return make_uniq<ComplexJSON>(to_string(unsafe_yyjson_get_sint(root)));
case YYJSON_TYPE_NUM | YYJSON_SUBTYPE_REAL:
case YYJSON_TYPE_RAW | YYJSON_SUBTYPE_NONE:
return make_uniq<ComplexJSON>(to_string(unsafe_yyjson_get_real(root)));
case YYJSON_TYPE_NULL | YYJSON_SUBTYPE_NONE:
return make_uniq<ComplexJSON>("null");
default:
yyjson_doc_free(doc);
throw SerializationException("Failed to parse JSON string: %s", json);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ void StringValueResult::NullPaddingQuotedNewlineCheck() const {
// If we have null_padding set, we found a quoted new line, we are scanning the file in parallel; We error.
LinesPerBoundary lines_per_batch(iterator.GetBoundaryIdx(), lines_read);
auto csv_error = CSVError::NullPaddingFail(state_machine.options, lines_per_batch, path);
error_handler.Error(csv_error, try_row);
error_handler.Error(csv_error, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ SourceResultType PhysicalAttach::GetData(ExecutionContext &context, DataChunk &c
if (info->on_conflict == OnCreateConflict::IGNORE_ON_CONFLICT ||
info->on_conflict == OnCreateConflict::REPLACE_ON_CONFLICT) {
// constant-time lookup in the catalog for the db name
auto existing_db = db_manager.GetDatabase(context.client, name);
auto existing_db = db_manager.GetDatabase(name);
if (existing_db) {
if ((existing_db->IsReadOnly() && options.access_mode == AccessMode::READ_WRITE) ||
(!existing_db->IsReadOnly() && options.access_mode == AccessMode::READ_ONLY)) {
Expand Down
209 changes: 160 additions & 49 deletions src/duckdb/src/function/compression_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,80 +34,191 @@ static const DefaultCompressionMethod internal_compression_methods[] = {
DictFSSTCompressionFun::TypeIsSupported},
{CompressionType::COMPRESSION_AUTO, nullptr, nullptr}};

static optional_ptr<CompressionFunction> FindCompressionFunction(CompressionFunctionSet &set, CompressionType type,
const PhysicalType physical_type) {
auto &functions = set.functions;
auto comp_entry = functions.find(type);
if (comp_entry != functions.end()) {
auto &type_functions = comp_entry->second;
auto type_entry = type_functions.find(physical_type);
if (type_entry != type_functions.end()) {
return &type_entry->second;
idx_t CompressionFunctionSet::GetCompressionIndex(PhysicalType physical_type) {
switch (physical_type) {
case PhysicalType::BOOL:
return 0;
case PhysicalType::UINT8:
return 1;
case PhysicalType::INT8:
return 2;
case PhysicalType::UINT16:
return 3;
case PhysicalType::INT16:
return 4;
case PhysicalType::UINT32:
return 5;
case PhysicalType::INT32:
return 6;
case PhysicalType::UINT64:
return 7;
case PhysicalType::INT64:
return 8;
case PhysicalType::FLOAT:
return 9;
case PhysicalType::DOUBLE:
return 10;
case PhysicalType::INTERVAL:
return 11;
case PhysicalType::LIST:
return 12;
case PhysicalType::STRUCT:
return 13;
case PhysicalType::ARRAY:
return 14;
case PhysicalType::VARCHAR:
return 15;
case PhysicalType::UINT128:
return 16;
case PhysicalType::INT128:
return 17;
case PhysicalType::BIT:
return 18;
default:
throw InternalException("Unsupported physical type for compression index");
}
}

idx_t CompressionFunctionSet::GetCompressionIndex(CompressionType type) {
return static_cast<idx_t>(type);
}

CompressionFunctionSet::CompressionFunctionSet() {
for (idx_t i = 0; i < PHYSICAL_TYPE_COUNT; i++) {
is_loaded[i] = false;
}
ResetDisabledMethods();
functions.resize(PHYSICAL_TYPE_COUNT);
}

bool EmitCompressionFunction(CompressionType type) {
switch (type) {
case CompressionType::COMPRESSION_UNCOMPRESSED:
case CompressionType::COMPRESSION_RLE:
case CompressionType::COMPRESSION_BITPACKING:
case CompressionType::COMPRESSION_DICTIONARY:
case CompressionType::COMPRESSION_CHIMP:
case CompressionType::COMPRESSION_PATAS:
case CompressionType::COMPRESSION_ALP:
case CompressionType::COMPRESSION_ALPRD:
case CompressionType::COMPRESSION_FSST:
case CompressionType::COMPRESSION_ZSTD:
case CompressionType::COMPRESSION_ROARING:
case CompressionType::COMPRESSION_DICT_FSST:
return true;
default:
return false;
}
}

vector<reference<CompressionFunction>> CompressionFunctionSet::GetCompressionFunctions(PhysicalType physical_type) {
LoadCompressionFunctions(physical_type);
auto index = GetCompressionIndex(physical_type);
auto &function_list = functions[index];
vector<reference<CompressionFunction>> result;
for (auto &entry : function_list) {
auto compression_index = GetCompressionIndex(entry.type);
if (is_disabled[compression_index]) {
// explicitly disabled
continue;
}
if (!EmitCompressionFunction(entry.type)) {
continue;
}
result.push_back(entry);
}
return nullptr;
return result;
}

void CompressionFunctionSet::LoadCompressionFunctions(PhysicalType physical_type) {
auto index = GetCompressionIndex(physical_type);
auto &function_list = functions[index];
if (is_loaded[index]) {
return;
}
// not loaded - try to load it
lock_guard<mutex> guard(lock);
// verify nobody loaded it in the mean-time
if (is_loaded[index]) {
return;
}
// actually perform the load
for (idx_t i = 0; internal_compression_methods[i].get_function; i++) {
TryLoadCompression(internal_compression_methods[i].type, physical_type, function_list);
}
is_loaded[index] = true;
}

static optional_ptr<CompressionFunction> LoadCompressionFunction(CompressionFunctionSet &set, CompressionType type,
const PhysicalType physical_type) {
void CompressionFunctionSet::TryLoadCompression(CompressionType type, PhysicalType physical_type,
vector<CompressionFunction> &result) {
for (idx_t i = 0; internal_compression_methods[i].get_function; i++) {
const auto &method = internal_compression_methods[i];
if (method.type == type) {
if (!method.supports_type(physical_type)) {
return nullptr;
// not supported for this type
return;
}
// The type is supported. We create the function and insert it into the set of available functions.
auto function = method.get_function(physical_type);
set.functions[type].insert(make_pair(physical_type, function));
return FindCompressionFunction(set, type, physical_type);
result.push_back(method.get_function(physical_type));
return;
}
}
throw InternalException("Unsupported compression function type");
}

static void TryLoadCompression(DBConfig &config, vector<reference<CompressionFunction>> &result, CompressionType type,
const PhysicalType physical_type) {
if (config.options.disabled_compression_methods.find(type) != config.options.disabled_compression_methods.end()) {
// explicitly disabled
return;
optional_ptr<CompressionFunction> CompressionFunctionSet::GetCompressionFunction(CompressionType type,
const PhysicalType physical_type) {
LoadCompressionFunctions(physical_type);

auto index = GetCompressionIndex(physical_type);
auto &function_list = functions[index];
for (auto &function : function_list) {
if (function.type == type) {
return function;
}
}
auto function = config.GetCompressionFunction(type, physical_type);
if (!function) {
return;
return nullptr;
}

void CompressionFunctionSet::SetDisabledCompressionMethods(const vector<CompressionType> &methods) {
ResetDisabledMethods();
for (auto &method : methods) {
auto idx = GetCompressionIndex(method);
is_disabled[idx] = true;
}
result.push_back(*function);
}

vector<reference<CompressionFunction>> DBConfig::GetCompressionFunctions(const PhysicalType physical_type) {
vector<reference<CompressionFunction>> result;
TryLoadCompression(*this, result, CompressionType::COMPRESSION_UNCOMPRESSED, physical_type);
TryLoadCompression(*this, result, CompressionType::COMPRESSION_RLE, physical_type);
TryLoadCompression(*this, result, CompressionType::COMPRESSION_BITPACKING, physical_type);
TryLoadCompression(*this, result, CompressionType::COMPRESSION_DICTIONARY, physical_type);
TryLoadCompression(*this, result, CompressionType::COMPRESSION_CHIMP, physical_type);
TryLoadCompression(*this, result, CompressionType::COMPRESSION_PATAS, physical_type);
TryLoadCompression(*this, result, CompressionType::COMPRESSION_ALP, physical_type);
TryLoadCompression(*this, result, CompressionType::COMPRESSION_ALPRD, physical_type);
TryLoadCompression(*this, result, CompressionType::COMPRESSION_FSST, physical_type);
TryLoadCompression(*this, result, CompressionType::COMPRESSION_ZSTD, physical_type);
TryLoadCompression(*this, result, CompressionType::COMPRESSION_ROARING, physical_type);
TryLoadCompression(*this, result, CompressionType::COMPRESSION_DICT_FSST, physical_type);
void DBConfig::SetDisabledCompressionMethods(const vector<CompressionType> &methods) {
compression_functions->SetDisabledCompressionMethods(methods);
}

vector<CompressionType> CompressionFunctionSet::GetDisabledCompressionMethods() const {
vector<CompressionType> result;
for (idx_t i = 0; i < COMPRESSION_TYPE_COUNT; i++) {
if (is_disabled[i]) {
result.push_back(static_cast<CompressionType>(i));
}
}
return result;
}

optional_ptr<CompressionFunction> DBConfig::GetCompressionFunction(CompressionType type,
const PhysicalType physical_type) {
lock_guard<mutex> l(compression_functions->lock);
vector<CompressionType> DBConfig::GetDisabledCompressionMethods() const {
return compression_functions->GetDisabledCompressionMethods();
}

// Check if the function is already loaded into the global compression functions.
auto function = FindCompressionFunction(*compression_functions, type, physical_type);
if (function) {
return function;
void CompressionFunctionSet::ResetDisabledMethods() {
for (idx_t i = 0; i < COMPRESSION_TYPE_COUNT; i++) {
is_disabled[i] = false;
}
}

// We could not find the function in the global compression functions,
// so we attempt loading it.
return LoadCompressionFunction(*compression_functions, type, physical_type);
vector<reference<CompressionFunction>> DBConfig::GetCompressionFunctions(const PhysicalType physical_type) {
return compression_functions->GetCompressionFunctions(physical_type);
}

optional_ptr<CompressionFunction> DBConfig::GetCompressionFunction(CompressionType type,
const PhysicalType physical_type) {
return compression_functions->GetCompressionFunction(type, physical_type);
}

} // namespace duckdb
6 changes: 3 additions & 3 deletions src/duckdb/src/function/table/version/pragma_version.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef DUCKDB_PATCH_VERSION
#define DUCKDB_PATCH_VERSION "0-dev4261"
#define DUCKDB_PATCH_VERSION "1-dev205"
#endif
#ifndef DUCKDB_MINOR_VERSION
#define DUCKDB_MINOR_VERSION 4
Expand All @@ -8,10 +8,10 @@
#define DUCKDB_MAJOR_VERSION 1
#endif
#ifndef DUCKDB_VERSION
#define DUCKDB_VERSION "v1.4.0-dev4261"
#define DUCKDB_VERSION "v1.4.1-dev205"
#endif
#ifndef DUCKDB_SOURCE_ID
#define DUCKDB_SOURCE_ID "05a2403cdd"
#define DUCKDB_SOURCE_ID "d52dd4e3df"
#endif
#include "duckdb/function/table/system_functions.hpp"
#include "duckdb/main/database.hpp"
Expand Down
26 changes: 25 additions & 1 deletion src/duckdb/src/include/duckdb/function/compression_function.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "duckdb/storage/data_pointer.hpp"
#include "duckdb/storage/storage_info.hpp"
#include "duckdb/storage/block_manager.hpp"
#include "duckdb/storage/storage_lock.hpp"

namespace duckdb {
class DatabaseInstance;
Expand Down Expand Up @@ -333,8 +334,31 @@ class CompressionFunction {

//! The set of compression functions
struct CompressionFunctionSet {
static constexpr idx_t COMPRESSION_TYPE_COUNT = 15;
static constexpr idx_t PHYSICAL_TYPE_COUNT = 19;

public:
CompressionFunctionSet();

vector<reference<CompressionFunction>> GetCompressionFunctions(PhysicalType physical_type);
optional_ptr<CompressionFunction> GetCompressionFunction(CompressionType type, PhysicalType physical_type);
void SetDisabledCompressionMethods(const vector<CompressionType> &methods);
vector<CompressionType> GetDisabledCompressionMethods() const;

private:
mutex lock;
map<CompressionType, map<PhysicalType, CompressionFunction>> functions;
atomic<bool> is_disabled[COMPRESSION_TYPE_COUNT];
atomic<bool> is_loaded[PHYSICAL_TYPE_COUNT];
vector<vector<CompressionFunction>> functions;

private:
void LoadCompressionFunctions(PhysicalType physical_type);

static void TryLoadCompression(CompressionType type, PhysicalType physical_type,
vector<CompressionFunction> &result);
static idx_t GetCompressionIndex(PhysicalType physical_type);
static idx_t GetCompressionIndex(CompressionType type);
void ResetDisabledMethods();
};

} // namespace duckdb
6 changes: 4 additions & 2 deletions src/duckdb/src/include/duckdb/main/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,6 @@ struct DBConfigOptions {
uint64_t zstd_min_string_length = 4096;
//! Force a specific compression method to be used when checkpointing (if available)
CompressionType force_compression = CompressionType::COMPRESSION_AUTO;
//! The set of disabled compression methods (default empty)
set<CompressionType> disabled_compression_methods;
//! Force a specific bitpacking mode to be used when using the bitpacking compression method
BitpackingMode force_bitpacking_mode = BitpackingMode::AUTO;
//! Database configuration variables as controlled by SET
Expand Down Expand Up @@ -316,6 +314,10 @@ struct DBConfig {
//! Returns the compression function matching the compression and physical type.
DUCKDB_API optional_ptr<CompressionFunction> GetCompressionFunction(CompressionType type,
const PhysicalType physical_type);
//! Sets the disabled compression methods
DUCKDB_API void SetDisabledCompressionMethods(const vector<CompressionType> &disabled_compression_methods);
//! Returns a list of disabled compression methods
DUCKDB_API vector<CompressionType> GetDisabledCompressionMethods() const;

//! Returns the encode function matching the encoding name.
DUCKDB_API optional_ptr<EncodingFunction> GetEncodeFunction(const string &name) const;
Expand Down
3 changes: 3 additions & 0 deletions src/duckdb/src/include/duckdb/main/database_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class DatabaseManager {
void FinalizeStartup();
//! Get an attached database by its name
optional_ptr<AttachedDatabase> GetDatabase(ClientContext &context, const string &name);
shared_ptr<AttachedDatabase> GetDatabase(const string &name);
//! Attach a new database
shared_ptr<AttachedDatabase> AttachDatabase(ClientContext &context, AttachInfo &info, AttachOptions &options);

Expand Down Expand Up @@ -106,6 +107,8 @@ class DatabaseManager {
//! Gets a list of all attached database paths
vector<string> GetAttachedDatabasePaths();

shared_ptr<AttachedDatabase> GetDatabaseInternal(const lock_guard<mutex> &, const string &name);

private:
//! The system database is a special database that holds system entries (e.g. functions)
shared_ptr<AttachedDatabase> system;
Expand Down
2 changes: 2 additions & 0 deletions src/duckdb/src/include/duckdb/planner/expression_binder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ class ExpressionBinder {

BindResult BindUnsupportedExpression(ParsedExpression &expr, idx_t depth, const string &message);

optional_ptr<CatalogEntry> BindAndQualifyFunction(FunctionExpression &function, bool allow_throw);

protected:
virtual BindResult BindGroupingFunction(OperatorExpression &op, idx_t depth);
virtual BindResult BindFunction(FunctionExpression &expr, ScalarFunctionCatalogEntry &function, idx_t depth);
Expand Down
Loading
Loading