From 56591c54f29e6d82d100cfb25add74642f384477 Mon Sep 17 00:00:00 2001 From: DuckDB Labs GitHub Bot Date: Wed, 27 Aug 2025 06:23:32 +0000 Subject: [PATCH] Update vendored DuckDB sources to fa13060a63 --- src/duckdb/src/catalog/catalog.cpp | 117 +-- .../default/default_table_functions.cpp | 2 +- .../src/catalog/default/default_views.cpp | 2 +- src/duckdb/src/common/csv_writer.cpp | 381 +++++++++ src/duckdb/src/common/file_system.cpp | 10 + .../serializer/buffered_file_writer.cpp | 4 + src/duckdb/src/common/virtual_file_system.cpp | 4 + .../buffer_manager/csv_file_handle.cpp | 8 + .../physical_plan/plan_comparison_join.cpp | 10 +- .../src/function/pragma/pragma_functions.cpp | 42 +- src/duckdb/src/function/table/copy_csv.cpp | 274 +----- .../src/function/table/system/duckdb_log.cpp | 44 +- .../table/system/duckdb_log_contexts.cpp | 21 +- .../function/table/system/logging_utils.cpp | 151 ++++ .../src/function/table/system_functions.cpp | 1 + .../function/table/version/pragma_version.cpp | 6 +- .../src/include/duckdb/catalog/catalog.hpp | 9 +- .../src/include/duckdb/common/csv_writer.hpp | 153 ++++ .../include/duckdb/common/file_open_flags.hpp | 7 + .../src/include/duckdb/common/file_system.hpp | 1 + .../duckdb/common/opener_file_system.hpp | 4 + .../duckdb/common/virtual_file_system.hpp | 1 + .../duckdb/function/table/read_csv.hpp | 14 +- .../function/table/system_functions.hpp | 4 + .../include/duckdb/logging/log_manager.hpp | 12 +- .../include/duckdb/logging/log_storage.hpp | 319 +++++-- .../src/include/duckdb/main/settings.hpp | 4 +- src/duckdb/src/logging/log_manager.cpp | 57 +- src/duckdb/src/logging/log_storage.cpp | 792 +++++++++++++++--- src/duckdb/src/main/http/http_util.cpp | 8 +- .../statement/transform_pivot_stmt.cpp | 2 + .../expression/bind_operator_expression.cpp | 14 +- src/duckdb/src/planner/pragma_handler.cpp | 2 +- src/duckdb/ub_src_common.cpp | 2 + src/duckdb/ub_src_function_table_system.cpp | 2 + 35 files changed, 1915 insertions(+), 569 deletions(-) create mode 100644 src/duckdb/src/common/csv_writer.cpp create mode 100644 src/duckdb/src/function/table/system/logging_utils.cpp create mode 100644 src/duckdb/src/include/duckdb/common/csv_writer.hpp diff --git a/src/duckdb/src/catalog/catalog.cpp b/src/duckdb/src/catalog/catalog.cpp index 1f9dd82e1..fba83e72c 100644 --- a/src/duckdb/src/catalog/catalog.cpp +++ b/src/duckdb/src/catalog/catalog.cpp @@ -838,17 +838,37 @@ CatalogEntryLookup Catalog::LookupEntry(CatalogEntryRetriever &retriever, const return res; } +static void ThrowDefaultTableAmbiguityException(CatalogEntryLookup &base_lookup, CatalogEntryLookup &default_table, + const string &name) { + auto entry_type = CatalogTypeToString(base_lookup.entry->type); + string fully_qualified_name_hint; + if (base_lookup.schema) { + fully_qualified_name_hint = StringUtil::Format(": '%s.%s.%s'", base_lookup.schema->catalog.GetName(), + base_lookup.schema->name, base_lookup.entry->name); + } + string fully_qualified_catalog_name_hint = StringUtil::Format( + ": '%s.%s.%s'", default_table.schema->catalog.GetName(), default_table.schema->name, default_table.entry->name); + throw CatalogException( + "Ambiguity detected for '%s': this could either refer to the '%s' '%s', or the " + "attached catalog '%s' which has a default table. To avoid this error, either detach the catalog and " + "reattach under a different name, or use a fully qualified name for the '%s'%s or for the Catalog " + "Default Table%s.", + name, entry_type, name, name, entry_type, fully_qualified_name_hint, fully_qualified_catalog_name_hint); +} + CatalogEntryLookup Catalog::TryLookupEntry(CatalogEntryRetriever &retriever, const vector &lookups, - const EntryLookupInfo &lookup_info, OnEntryNotFound if_not_found) { + const EntryLookupInfo &lookup_info, OnEntryNotFound if_not_found, + bool allow_default_table_lookup) { auto &context = retriever.GetContext(); reference_set_t schemas; bool all_errors = true; ErrorData error_data; + CatalogEntryLookup result; for (auto &lookup : lookups) { auto transaction = lookup.catalog.GetCatalogTransaction(context); - auto result = lookup.catalog.TryLookupEntryInternal(transaction, lookup.schema, lookup.lookup_info); + result = lookup.catalog.TryLookupEntryInternal(transaction, lookup.schema, lookup.lookup_info); if (result.Found()) { - return result; + break; } if (result.schema) { schemas.insert(*result.schema); @@ -859,6 +879,29 @@ CatalogEntryLookup Catalog::TryLookupEntry(CatalogEntryRetriever &retriever, con error_data = std::move(result.error); } } + + // Special case for tables: we do a second lookup searching for catalogs with default tables that also match this + // lookup + if (lookup_info.GetCatalogType() == CatalogType::TABLE_ENTRY && allow_default_table_lookup) { + if (!result.Found()) { + result = TryLookupDefaultTable(retriever, lookup_info, false); + if (result.error.HasError()) { + error_data = std::move(result.error); + } + } else { + // allow_ignore_at_clause set to true to ensure `FROM AT ` is considered + // ambiguous with a default table lookup in a catalog that does not support time travel + auto ambiguity_lookup = TryLookupDefaultTable(retriever, lookup_info, true); + if (ambiguity_lookup.Found()) { + ThrowDefaultTableAmbiguityException(result, ambiguity_lookup, lookup_info.GetEntryName()); + } + } + } + + if (result.Found()) { + return result; + } + if (all_errors && error_data.HasError()) { error_data.Throw(); } @@ -878,41 +921,29 @@ CatalogEntryLookup Catalog::TryLookupEntry(CatalogEntryRetriever &retriever, con } } -CatalogEntryLookup Catalog::TryLookupDefaultTable(CatalogEntryRetriever &retriever, const string &catalog, - const string &schema, const EntryLookupInfo &lookup_info, - OnEntryNotFound if_not_found) { - // Default tables of catalogs can only be accessed by the catalog name directly - if (!schema.empty() || !catalog.empty()) { - return {nullptr, nullptr, ErrorData()}; - } - - vector catalog_by_name_lookups; +CatalogEntryLookup Catalog::TryLookupDefaultTable(CatalogEntryRetriever &retriever, const EntryLookupInfo &lookup_info, + bool allow_ignore_at_clause) { auto catalog_by_name = GetCatalogEntry(retriever, lookup_info.GetEntryName()); + if (catalog_by_name && catalog_by_name->HasDefaultTable()) { - catalog_by_name_lookups.emplace_back(*catalog_by_name, CatalogType::TABLE_ENTRY, - catalog_by_name->GetDefaultTableSchema(), - catalog_by_name->GetDefaultTable()); - } + auto transaction = catalog_by_name->GetCatalogTransaction(retriever.GetContext()); + QueryErrorContext context; - return TryLookupEntry(retriever, catalog_by_name_lookups, lookup_info, if_not_found); -} + string table_schema = catalog_by_name->GetDefaultTableSchema(); + string table_name = catalog_by_name->GetDefaultTable(); -static void ThrowDefaultTableAmbiguityException(CatalogEntryLookup &base_lookup, CatalogEntryLookup &default_table, - const string &name) { - auto entry_type = CatalogTypeToString(base_lookup.entry->type); - string fully_qualified_name_hint; - if (base_lookup.schema) { - fully_qualified_name_hint = StringUtil::Format(": '%s.%s.%s'", base_lookup.schema->catalog.GetName(), - base_lookup.schema->name, base_lookup.entry->name); + optional_ptr at_clause; + if (!catalog_by_name->SupportsTimeTravel() && allow_ignore_at_clause) { + at_clause = nullptr; + } else { + at_clause = lookup_info.GetAtClause(); + } + + EntryLookupInfo info = EntryLookupInfo(CatalogType::TABLE_ENTRY, table_name, at_clause, context); + return catalog_by_name->TryLookupEntryInternal(transaction, table_schema, info); } - string fully_qualified_catalog_name_hint = StringUtil::Format( - ": '%s.%s.%s'", default_table.schema->catalog.GetName(), default_table.schema->name, default_table.entry->name); - throw CatalogException( - "Ambiguity detected for '%s': this could either refer to the '%s' '%s', or the " - "attached catalog '%s' which has a default table. To avoid this error, either detach the catalog and " - "reattach under a different name, or use a fully qualified name for the '%s'%s or for the Catalog " - "Default Table%s.", - name, entry_type, name, name, entry_type, fully_qualified_name_hint, fully_qualified_catalog_name_hint); + + return {nullptr, nullptr, ErrorData()}; } CatalogEntryLookup Catalog::TryLookupEntry(CatalogEntryRetriever &retriever, const string &catalog, @@ -945,25 +976,9 @@ CatalogEntryLookup Catalog::TryLookupEntry(CatalogEntryRetriever &retriever, con lookups.emplace_back(std::move(lookup)); } - // Do the main lookup - auto lookup_result = TryLookupEntry(retriever, lookups, lookup_info, if_not_found); - - // Special case for tables: we do a second lookup searching for catalogs with default tables that also match this - // lookup - if (lookup_info.GetCatalogType() == CatalogType::TABLE_ENTRY) { - auto lookup_result_default_table = - TryLookupDefaultTable(retriever, catalog, schema, lookup_info, OnEntryNotFound::RETURN_NULL); - - if (lookup_result_default_table.Found() && lookup_result.Found()) { - ThrowDefaultTableAmbiguityException(lookup_result, lookup_result_default_table, lookup_info.GetEntryName()); - } - - if (lookup_result_default_table.Found()) { - return lookup_result_default_table; - } - } + bool allow_default_table_lookup = catalog.empty() && schema.empty(); - return lookup_result; + return TryLookupEntry(retriever, lookups, lookup_info, if_not_found, allow_default_table_lookup); } CatalogEntry &Catalog::GetEntry(ClientContext &context, CatalogType catalog_type, const string &catalog_name, diff --git a/src/duckdb/src/catalog/default/default_table_functions.cpp b/src/duckdb/src/catalog/default/default_table_functions.cpp index aa9d66ef4..c07786474 100644 --- a/src/duckdb/src/catalog/default/default_table_functions.cpp +++ b/src/duckdb/src/catalog/default/default_table_functions.cpp @@ -68,7 +68,7 @@ FROM histogram_values(source, col_name, bin_count := bin_count, technique := tec )"}, {DEFAULT_SCHEMA, "duckdb_logs_parsed", {"log_type"}, {}, R"( SELECT * EXCLUDE (message), UNNEST(parse_duckdb_log_message(log_type, message)) -FROM duckdb_logs +FROM duckdb_logs(denormalized_table=1) WHERE type = log_type )"}, {nullptr, nullptr, {nullptr}, {{nullptr, nullptr}}, nullptr} diff --git a/src/duckdb/src/catalog/default/default_views.cpp b/src/duckdb/src/catalog/default/default_views.cpp index 1edb4a508..023b6f800 100644 --- a/src/duckdb/src/catalog/default/default_views.cpp +++ b/src/duckdb/src/catalog/default/default_views.cpp @@ -26,7 +26,7 @@ static const DefaultView internal_views[] = { {DEFAULT_SCHEMA, "duckdb_tables", "SELECT * FROM duckdb_tables() WHERE NOT internal"}, {DEFAULT_SCHEMA, "duckdb_types", "SELECT * FROM duckdb_types()"}, {DEFAULT_SCHEMA, "duckdb_views", "SELECT * FROM duckdb_views() WHERE NOT internal"}, - {DEFAULT_SCHEMA, "duckdb_logs", "SELECT * exclude (l.rowid, l.context_id, c.context_id) FROM (SELECT row_number() OVER () AS rowid, * FROM duckdb_logs()) as l JOIN duckdb_log_contexts() as c ON l.context_id=c.context_id order by timestamp, l.rowid;"}, + {DEFAULT_SCHEMA, "duckdb_logs", "SELECT * FROM duckdb_logs(denormalized_table=true)"}, {"pg_catalog", "pg_am", "SELECT 0 oid, 'art' amname, NULL amhandler, 'i' amtype"}, {"pg_catalog", "pg_prepared_statements", "SELECT name, statement, NULL prepare_time, parameter_types, result_types, NULL from_sql, NULL generic_plans, NULL custom_plans from duckdb_prepared_statements()"}, {"pg_catalog", "pg_attribute", "SELECT table_oid attrelid, column_name attname, data_type_id atttypid, 0 attstattarget, NULL attlen, column_index attnum, 0 attndims, -1 attcacheoff, case when data_type ilike '%decimal%' then numeric_precision*1000+numeric_scale else -1 end atttypmod, false attbyval, NULL attstorage, NULL attalign, NOT is_nullable attnotnull, column_default IS NOT NULL atthasdef, false atthasmissing, '' attidentity, '' attgenerated, false attisdropped, true attislocal, 0 attinhcount, 0 attcollation, NULL attcompression, NULL attacl, NULL attoptions, NULL attfdwoptions, NULL attmissingval FROM duckdb_columns()"}, diff --git a/src/duckdb/src/common/csv_writer.cpp b/src/duckdb/src/common/csv_writer.cpp new file mode 100644 index 000000000..bb9ff81d2 --- /dev/null +++ b/src/duckdb/src/common/csv_writer.cpp @@ -0,0 +1,381 @@ +#include "duckdb/common/csv_writer.hpp" +#include "duckdb/common/serializer/write_stream.hpp" +#include "duckdb/common/serializer/memory_stream.hpp" +#include "duckdb/execution/operator/csv_scanner/csv_reader_options.hpp" +#include "duckdb/common/serializer/buffered_file_writer.hpp" + +namespace duckdb { + +static string TransformNewLine(string new_line) { + new_line = StringUtil::Replace(new_line, "\\r", "\r"); + return StringUtil::Replace(new_line, "\\n", "\n"); +} + +CSVWriterState::CSVWriterState() + : flush_size(MemoryStream::DEFAULT_INITIAL_CAPACITY), stream(make_uniq()) { +} + +CSVWriterState::CSVWriterState(ClientContext &context, idx_t flush_size_p) + : flush_size(flush_size_p), stream(make_uniq(Allocator::Get(context))) { +} + +CSVWriterState::CSVWriterState(DatabaseInstance &db, idx_t flush_size_p) + : flush_size(flush_size_p), stream(make_uniq(BufferAllocator::Get(db), flush_size)) { +} + +CSVWriterState::~CSVWriterState() { + if (stream && !Exception::UncaughtException()) { + // Ensure we don't accidentally destroy unflushed data + D_ASSERT(stream->GetPosition() == 0); + } +} + +CSVWriterOptions::CSVWriterOptions(const string &delim, const char "e, const string &write_newline) { + requires_quotes = vector(256, false); + requires_quotes['\n'] = true; + requires_quotes['\r'] = true; + requires_quotes['#'] = true; + requires_quotes[NumericCast(delim[0])] = true; + requires_quotes[NumericCast(quote)] = true; + + if (!write_newline.empty()) { + newline = TransformNewLine(write_newline); + } +} + +CSVWriterOptions::CSVWriterOptions(CSVReaderOptions &options) + : CSVWriterOptions(options.dialect_options.state_machine_options.delimiter.GetValue(), + options.dialect_options.state_machine_options.quote.GetValue(), options.write_newline) { +} + +CSVWriter::CSVWriter(WriteStream &stream, vector name_list, bool shared) + : writer_options(options.dialect_options.state_machine_options.delimiter.GetValue(), + options.dialect_options.state_machine_options.quote.GetValue(), options.write_newline), + write_stream(stream), should_initialize(true), shared(shared) { + auto size = name_list.size(); + options.name_list = std::move(name_list); + options.force_quote.resize(size, false); + options.force_quote.resize(size, false); + + if (!shared) { + global_write_state = make_uniq(); + } +} + +CSVWriter::CSVWriter(CSVReaderOptions &options_p, FileSystem &fs, const string &file_path, + FileCompressionType compression, bool shared) + : options(options_p), + writer_options(options.dialect_options.state_machine_options.delimiter.GetValue(), + options.dialect_options.state_machine_options.quote.GetValue(), options.write_newline), + file_writer(make_uniq(fs, file_path, + FileFlags::FILE_FLAGS_WRITE | FileFlags::FILE_FLAGS_FILE_CREATE_NEW | + FileLockType::WRITE_LOCK | compression)), + write_stream(*file_writer), should_initialize(true), shared(shared) { + + if (!shared) { + global_write_state = make_uniq(); + } +} + +void CSVWriter::Initialize(bool force) { + if (!force && !should_initialize) { + return; + } + + if (!options.prefix.empty()) { + WriteRawString(options.prefix); + } + + if (!(options.dialect_options.header.IsSetByUser() && !options.dialect_options.header.GetValue())) { + WriteHeader(); + } + + should_initialize = false; +} + +void CSVWriter::WriteChunk(DataChunk &input, CSVWriterState &local_state) { + WriteChunk(input, *local_state.stream, options, local_state.written_anything, writer_options); + + if (!local_state.require_manual_flush && local_state.stream->GetPosition() >= local_state.flush_size) { + Flush(local_state); + } +} + +void CSVWriter::WriteChunk(DataChunk &input) { + // Method intended for non-shared use only + D_ASSERT(!shared); + + WriteChunk(input, *global_write_state); +} + +void CSVWriter::WriteRawString(const string &raw_string) { + if (shared) { + lock_guard flock(lock); + bytes_written += raw_string.size(); + write_stream.WriteData(const_data_ptr_cast(raw_string.c_str()), raw_string.size()); + } else { + bytes_written += raw_string.size(); + write_stream.WriteData(const_data_ptr_cast(raw_string.c_str()), raw_string.size()); + } +} + +void CSVWriter::WriteRawString(const string &prefix, CSVWriterState &local_state) { + local_state.stream->WriteData(const_data_ptr_cast(prefix.c_str()), prefix.size()); + + if (!local_state.require_manual_flush && local_state.stream->GetPosition() >= writer_options.flush_size) { + Flush(local_state); + } +} + +void CSVWriter::WriteHeader() { + CSVWriterState state; + WriteHeader(*state.stream, options, writer_options); + state.written_anything = true; + Flush(state); +} + +void CSVWriter::Flush(CSVWriterState &local_state) { + if (shared) { + lock_guard flock(lock); + FlushInternal(local_state); + } else { + FlushInternal(local_state); + } +} + +void CSVWriter::Flush() { + // Method intended for non-shared use only + D_ASSERT(!shared); + FlushInternal(*global_write_state); +} + +void CSVWriter::Reset(optional_ptr local_state) { + if (shared) { + lock_guard flock(lock); + ResetInternal(local_state); + } else { + ResetInternal(local_state); + } +} + +void CSVWriter::Close() { + if (shared) { + lock_guard flock(lock); + if (file_writer) { + file_writer->Close(); + } + } else { + if (file_writer) { + file_writer->Close(); + } + } +} + +void CSVWriter::FlushInternal(CSVWriterState &local_state) { + if (!local_state.written_anything) { + return; + } + + if (!written_anything) { + written_anything = true; + } else if (writer_options.newline_writing_mode == CSVNewLineMode::WRITE_BEFORE) { + write_stream.WriteData(const_data_ptr_cast(writer_options.newline.c_str()), writer_options.newline.size()); + } + + written_anything = true; + bytes_written += local_state.stream->GetPosition(); + write_stream.WriteData(local_state.stream->GetData(), local_state.stream->GetPosition()); + + local_state.Reset(); +} + +void CSVWriter::ResetInternal(optional_ptr local_state) { + if (local_state) { + local_state->Reset(); + } + + written_anything = false; + bytes_written = 0; +} + +unique_ptr CSVWriter::InitializeLocalWriteState(ClientContext &context, idx_t flush_size) { + auto res = make_uniq(context, flush_size); + res->stream = make_uniq(); + return res; +} + +unique_ptr CSVWriter::InitializeLocalWriteState(DatabaseInstance &db, idx_t flush_size) { + auto res = make_uniq(db, flush_size); + res->stream = make_uniq(); + return res; +} + +idx_t CSVWriter::BytesWritten() { + if (shared) { + lock_guard flock(lock); + return bytes_written; + } + return bytes_written; +} + +static idx_t GetFileSize(unique_ptr &file_writer, idx_t &bytes_written) { + if (file_writer) { + return file_writer->GetFileSize(); + } + return bytes_written; +} + +idx_t CSVWriter::FileSize() { + if (shared) { + lock_guard flock(lock); + return GetFileSize(file_writer, bytes_written); + } + return GetFileSize(file_writer, bytes_written); +} + +void CSVWriter::WriteQuoteOrEscape(WriteStream &writer, char quote_or_escape) { + if (quote_or_escape != '\0') { + writer.Write(quote_or_escape); + } +} + +string CSVWriter::AddEscapes(char to_be_escaped, char escape, const string &val) { + idx_t i = 0; + string new_val = ""; + idx_t found = val.find(to_be_escaped); + + while (found != string::npos) { + while (i < found) { + new_val += val[i]; + i++; + } + if (escape != '\0') { + new_val += escape; + found = val.find(to_be_escaped, found + 1); + } + } + while (i < val.length()) { + new_val += val[i]; + i++; + } + return new_val; +} + +bool CSVWriter::RequiresQuotes(const char *str, idx_t len, const string &null_str, + const vector &requires_quotes) { + // check if the string is equal to the null string + if (len == null_str.size() && memcmp(str, null_str.c_str(), len) == 0) { + return true; + } + auto str_data = const_data_ptr_cast(str); + for (idx_t i = 0; i < len; i++) { + if (requires_quotes[str_data[i]]) { + // this byte requires quotes - write a quoted string + return true; + } + } + // no newline, quote or delimiter in the string + // no quoting or escaping necessary + return false; +} + +void CSVWriter::WriteQuotedString(WriteStream &writer, const char *str, idx_t len, idx_t col_idx, + CSVReaderOptions &options, CSVWriterOptions &writer_options) { + WriteQuotedString(writer, str, len, options.force_quote[col_idx], options.null_str[0], + writer_options.requires_quotes, options.dialect_options.state_machine_options.quote.GetValue(), + options.dialect_options.state_machine_options.escape.GetValue()); +} + +void CSVWriter::WriteQuotedString(WriteStream &writer, const char *str, idx_t len, bool force_quote, + const string &null_str, const vector &requires_quotes, char quote, + char escape) { + if (!force_quote) { + // force quote is disabled: check if we need to add quotes anyway + force_quote = RequiresQuotes(str, len, null_str, requires_quotes); + } + // If a quote is set to none (i.e., null-terminator) we skip the quotation + if (force_quote && quote != '\0') { + // quoting is enabled: we might need to escape things in the string + bool requires_escape = false; + // simple CSV + // do a single loop to check for a quote or escape value + for (idx_t i = 0; i < len; i++) { + if (str[i] == quote || str[i] == escape) { + requires_escape = true; + break; + } + } + + if (!requires_escape) { + // fast path: no need to escape anything + WriteQuoteOrEscape(writer, quote); + writer.WriteData(const_data_ptr_cast(str), len); + WriteQuoteOrEscape(writer, quote); + return; + } + + // slow path: need to add escapes + string new_val(str, len); + new_val = AddEscapes(escape, escape, new_val); + if (escape != quote) { + // need to escape quotes separately + new_val = AddEscapes(quote, escape, new_val); + } + WriteQuoteOrEscape(writer, quote); + writer.WriteData(const_data_ptr_cast(new_val.c_str()), new_val.size()); + WriteQuoteOrEscape(writer, quote); + } else { + writer.WriteData(const_data_ptr_cast(str), len); + } +} + +// Write a chunk to a csv file +void CSVWriter::WriteChunk(DataChunk &input, MemoryStream &writer, CSVReaderOptions &options, bool &written_anything, + CSVWriterOptions &writer_options) { + // now loop over the vectors and output the values + for (idx_t row_idx = 0; row_idx < input.size(); row_idx++) { + if (row_idx == 0 && !written_anything) { + written_anything = true; + } else if (writer_options.newline_writing_mode == CSVNewLineMode::WRITE_BEFORE) { + writer.WriteData(const_data_ptr_cast(writer_options.newline.c_str()), writer_options.newline.size()); + } + // write values + D_ASSERT(options.null_str.size() == 1); + for (idx_t col_idx = 0; col_idx < input.ColumnCount(); col_idx++) { + if (col_idx != 0) { + CSVWriter::WriteQuoteOrEscape(writer, + options.dialect_options.state_machine_options.delimiter.GetValue()[0]); + } + if (FlatVector::IsNull(input.data[col_idx], row_idx)) { + // write null value + writer.WriteData(const_data_ptr_cast(options.null_str[0].c_str()), options.null_str[0].size()); + continue; + } + + // non-null value, fetch the string value from the cast chunk + auto str_data = FlatVector::GetData(input.data[col_idx]); + // FIXME: we could gain some performance here by checking for certain types if they ever require quotes + // (e.g. integers only require quotes if the delimiter is a number, decimals only require quotes if the + // delimiter is a number or "." character) + + WriteQuotedString(writer, str_data[row_idx].GetData(), str_data[row_idx].GetSize(), col_idx, options, + writer_options); + } + if (writer_options.newline_writing_mode == CSVNewLineMode::WRITE_AFTER) { + writer.WriteData(const_data_ptr_cast(writer_options.newline.c_str()), writer_options.newline.size()); + } + } +} + +void CSVWriter::WriteHeader(MemoryStream &stream, CSVReaderOptions &options, CSVWriterOptions &writer_options) { + for (idx_t i = 0; i < options.name_list.size(); i++) { + if (i != 0) { + WriteQuoteOrEscape(stream, options.dialect_options.state_machine_options.delimiter.GetValue()[0]); + } + + WriteQuotedString(stream, options.name_list[i].c_str(), options.name_list[i].size(), i, options, + writer_options); + } +} + +} // namespace duckdb diff --git a/src/duckdb/src/common/file_system.cpp b/src/duckdb/src/common/file_system.cpp index 420470de8..0dc916c06 100644 --- a/src/duckdb/src/common/file_system.cpp +++ b/src/duckdb/src/common/file_system.cpp @@ -62,6 +62,7 @@ constexpr FileOpenFlags FileFlags::FILE_FLAGS_PARALLEL_ACCESS; constexpr FileOpenFlags FileFlags::FILE_FLAGS_EXCLUSIVE_CREATE; constexpr FileOpenFlags FileFlags::FILE_FLAGS_NULL_IF_EXISTS; constexpr FileOpenFlags FileFlags::FILE_FLAGS_MULTI_CLIENT_ACCESS; +constexpr FileOpenFlags FileFlags::FILE_FLAGS_DISABLE_LOGGING; void FileOpenFlags::Verify() { #ifdef DEBUG @@ -604,6 +605,10 @@ void FileSystem::SetDisabledFileSystems(const vector &names) { throw NotImplementedException("%s: Can't disable file systems on a non-virtual file system", GetName()); } +bool FileSystem::SubSystemIsDisabled(const string &name) { + throw NotImplementedException("%s: Non-virtual file system does not have subsystems", GetName()); +} + vector FileSystem::ListSubSystems() { throw NotImplementedException("%s: Can't list sub systems on a non-virtual file system", GetName()); } @@ -798,11 +803,16 @@ FileType FileHandle::GetType() { } void FileHandle::TryAddLogger(FileOpener &opener) { + if (flags.DisableLogging()) { + return; + } + auto context = opener.TryGetClientContext(); if (context && Logger::Get(*context).ShouldLog(FileSystemLogType::NAME, FileSystemLogType::LEVEL)) { logger = context->logger; return; } + auto database = opener.TryGetDatabase(); if (database && Logger::Get(*database).ShouldLog(FileSystemLogType::NAME, FileSystemLogType::LEVEL)) { logger = database->GetLogManager().GlobalLoggerReference(); diff --git a/src/duckdb/src/common/serializer/buffered_file_writer.cpp b/src/duckdb/src/common/serializer/buffered_file_writer.cpp index f0d811921..a5b255502 100644 --- a/src/duckdb/src/common/serializer/buffered_file_writer.cpp +++ b/src/duckdb/src/common/serializer/buffered_file_writer.cpp @@ -90,6 +90,10 @@ void BufferedFileWriter::Truncate(idx_t size) { handle->Truncate(NumericCast(size)); // reset anything written in the buffer offset = 0; + // Reset the seek position if applicable + if (handle->CanSeek() && handle->SeekPosition() > size) { + handle->Seek(size); + } } } diff --git a/src/duckdb/src/common/virtual_file_system.cpp b/src/duckdb/src/common/virtual_file_system.cpp index 559e140f5..757ebef16 100644 --- a/src/duckdb/src/common/virtual_file_system.cpp +++ b/src/duckdb/src/common/virtual_file_system.cpp @@ -212,6 +212,10 @@ void VirtualFileSystem::SetDisabledFileSystems(const vector &names) { disabled_file_systems = std::move(new_disabled_file_systems); } +bool VirtualFileSystem::SubSystemIsDisabled(const string &name) { + return disabled_file_systems.find(name) != disabled_file_systems.end(); +} + FileSystem &VirtualFileSystem::FindFileSystem(const string &path) { auto &fs = FindFileSystemInternal(path); if (!disabled_file_systems.empty() && disabled_file_systems.find(fs.GetName()) != disabled_file_systems.end()) { diff --git a/src/duckdb/src/execution/operator/csv_scanner/buffer_manager/csv_file_handle.cpp b/src/duckdb/src/execution/operator/csv_scanner/buffer_manager/csv_file_handle.cpp index 2d4b5388f..24fd6a67f 100644 --- a/src/duckdb/src/execution/operator/csv_scanner/buffer_manager/csv_file_handle.cpp +++ b/src/duckdb/src/execution/operator/csv_scanner/buffer_manager/csv_file_handle.cpp @@ -76,6 +76,14 @@ bool CSVFileHandle::FinishedReading() const { } idx_t CSVFileHandle::Read(void *buffer, idx_t nr_bytes) { + // We avoid reading past the original size of the file for uncompressed files in utf-8 encoding. This avoids reading + // the data that is written after opening the file. This can be useful, for example when reading a duckdb log file + // in csv format while logging is enabled + if (file_handle->GetFileCompressionType() == FileCompressionType::UNCOMPRESSED && file_handle->CanSeek() && + encoder.encoding_name == "utf-8") { + nr_bytes = MinValue(nr_bytes, file_size - file_handle->SeekPosition()); + } + requested_bytes += nr_bytes; // if this is a plain file source OR we can seek we are not caching anything idx_t bytes_read = 0; diff --git a/src/duckdb/src/execution/physical_plan/plan_comparison_join.cpp b/src/duckdb/src/execution/physical_plan/plan_comparison_join.cpp index fb499d2d3..aeb77a800 100644 --- a/src/duckdb/src/execution/physical_plan/plan_comparison_join.cpp +++ b/src/duckdb/src/execution/physical_plan/plan_comparison_join.cpp @@ -1,4 +1,3 @@ -#include "duckdb/catalog/catalog_entry/duck_table_entry.hpp" #include "duckdb/execution/operator/join/perfect_hash_join_executor.hpp" #include "duckdb/execution/operator/join/physical_blockwise_nl_join.hpp" #include "duckdb/execution/operator/join/physical_cross_product.hpp" @@ -6,14 +5,11 @@ #include "duckdb/execution/operator/join/physical_iejoin.hpp" #include "duckdb/execution/operator/join/physical_nested_loop_join.hpp" #include "duckdb/execution/operator/join/physical_piecewise_merge_join.hpp" -#include "duckdb/execution/operator/scan/physical_table_scan.hpp" #include "duckdb/execution/physical_plan_generator.hpp" -#include "duckdb/function/table/table_scan.hpp" #include "duckdb/main/client_context.hpp" #include "duckdb/planner/expression/bound_reference_expression.hpp" #include "duckdb/planner/expression_iterator.hpp" #include "duckdb/planner/operator/logical_comparison_join.hpp" -#include "duckdb/transaction/duck_transaction.hpp" #include "duckdb/main/settings.hpp" namespace duckdb { @@ -68,15 +64,15 @@ PhysicalOperator &PhysicalPlanGenerator::PlanComparisonJoin(LogicalComparisonJoi D_ASSERT(op.left_projection_map.empty()); idx_t nested_loop_join_threshold = DBConfig::GetSetting(context); - if (left.estimated_cardinality <= nested_loop_join_threshold || - right.estimated_cardinality <= nested_loop_join_threshold) { + if (left.estimated_cardinality < nested_loop_join_threshold || + right.estimated_cardinality < nested_loop_join_threshold) { can_iejoin = false; can_merge = false; } if (can_merge && can_iejoin) { idx_t merge_join_threshold = DBConfig::GetSetting(context); - if (left.estimated_cardinality <= merge_join_threshold || right.estimated_cardinality <= merge_join_threshold) { + if (left.estimated_cardinality < merge_join_threshold || right.estimated_cardinality < merge_join_threshold) { can_iejoin = false; } } diff --git a/src/duckdb/src/function/pragma/pragma_functions.cpp b/src/duckdb/src/function/pragma/pragma_functions.cpp index ad1a3ec3a..9dc92724b 100644 --- a/src/duckdb/src/function/pragma/pragma_functions.cpp +++ b/src/duckdb/src/function/pragma/pragma_functions.cpp @@ -95,10 +95,6 @@ static void PragmaForceCheckpoint(ClientContext &context, const FunctionParamete DBConfig::GetConfig(context).options.force_checkpoint = true; } -static void PragmaTruncateDuckDBLogs(ClientContext &context, const FunctionParameters ¶meters) { - context.db->GetLogManager().TruncateLogStorage(); -} - static void PragmaDisableForceParallelism(ClientContext &context, const FunctionParameters ¶meters) { ClientConfig::GetConfig(context).verify_parallelism = false; } @@ -117,35 +113,6 @@ static void PragmaDisableCheckpointOnShutdown(ClientContext &context, const Func DBConfig::GetConfig(context).options.checkpoint_on_shutdown = false; } -static void PragmaEnableLogging(ClientContext &context, const FunctionParameters ¶meters) { - if (parameters.values.empty()) { - context.db->GetLogManager().SetEnableLogging(true); - return; - } - - if (parameters.values.size() != 1) { - throw InvalidInputException("PragmaEnableLogging: expected 0 or 1 parameter"); - } - - vector types; - - if (parameters.values[0].type() == LogicalType::VARCHAR) { - types.push_back(parameters.values[0].GetValue()); - } else if (parameters.values[0].type() == LogicalType::LIST(LogicalType::VARCHAR)) { - for (const auto &child : ListValue::GetChildren(parameters.values[0])) { - types.push_back(child.GetValue()); - } - } else { - throw InvalidInputException("Unexpected type for PragmaEnableLogging"); - } - - context.db->GetLogManager().SetEnableStructuredLoggers(types); -} - -static void PragmaDisableLogging(ClientContext &context, const FunctionParameters ¶meters) { - context.db->GetLogManager().SetEnableLogging(false); -} - static void PragmaEnableOptimizer(ClientContext &context, const FunctionParameters ¶meters) { ClientConfig::GetConfig(context).enable_optimizer = true; } @@ -157,6 +124,10 @@ static void PragmaDisableOptimizer(ClientContext &context, const FunctionParamet void PragmaFunctions::RegisterFunction(BuiltinFunctions &set) { RegisterEnableProfiling(set); + // NOTE: use of Pragma functions is discouraged. Instead, opt for adding a regular table function to be invoked with + // CALL. + // see for example the "enable_logging" function + set.AddFunction(PragmaFunction::PragmaStatement("disable_profile", PragmaDisableProfiling)); set.AddFunction(PragmaFunction::PragmaStatement("disable_profiling", PragmaDisableProfiling)); @@ -178,16 +149,11 @@ void PragmaFunctions::RegisterFunction(BuiltinFunctions &set) { set.AddFunction(PragmaFunction::PragmaStatement("enable_object_cache", PragmaEnableObjectCache)); set.AddFunction(PragmaFunction::PragmaStatement("disable_object_cache", PragmaDisableObjectCache)); - set.AddFunction(PragmaFunction::PragmaCall("enable_logging", PragmaEnableLogging, {}, LogicalType::VARCHAR)); - set.AddFunction(PragmaFunction::PragmaStatement("disable_logging", PragmaDisableLogging)); - set.AddFunction(PragmaFunction::PragmaStatement("enable_optimizer", PragmaEnableOptimizer)); set.AddFunction(PragmaFunction::PragmaStatement("disable_optimizer", PragmaDisableOptimizer)); set.AddFunction(PragmaFunction::PragmaStatement("force_checkpoint", PragmaForceCheckpoint)); - set.AddFunction(PragmaFunction::PragmaStatement("truncate_duckdb_logs", PragmaTruncateDuckDBLogs)); - set.AddFunction(PragmaFunction::PragmaStatement("enable_progress_bar", PragmaEnableProgressBar)); set.AddFunction(PragmaFunction::PragmaStatement("disable_progress_bar", PragmaDisableProgressBar)); diff --git a/src/duckdb/src/function/table/copy_csv.cpp b/src/duckdb/src/function/table/copy_csv.cpp index 6987f91a0..806257c1f 100644 --- a/src/duckdb/src/function/table/copy_csv.cpp +++ b/src/duckdb/src/function/table/copy_csv.cpp @@ -1,4 +1,5 @@ #include "duckdb/common/bind_helpers.hpp" +#include "duckdb/common/csv_writer.hpp" #include "duckdb/common/file_system.hpp" #include "duckdb/common/multi_file/multi_file_reader.hpp" #include "duckdb/common/serializer/memory_stream.hpp" @@ -20,8 +21,6 @@ #include "duckdb/planner/expression/bound_reference_expression.hpp" #include "duckdb/common/multi_file/multi_file_function.hpp" -#include - namespace duckdb { void AreOptionsEqual(char str_1, char str_2, const string &name_str_1, const string &name_str_2) { @@ -54,11 +53,6 @@ void StringDetection(const string &str_1, const string &str_2, const string &nam //===--------------------------------------------------------------------===// // Bind //===--------------------------------------------------------------------===// -void WriteQuoteOrEscape(WriteStream &writer, char quote_or_escape) { - if (quote_or_escape != '\0') { - writer.Write(quote_or_escape); - } -} void BaseCSVData::Finalize() { auto delimiter_string = options.dialect_options.state_machine_options.delimiter.GetValue(); @@ -120,12 +114,6 @@ void BaseCSVData::Finalize() { } } -string TransformNewLine(string new_line) { - new_line = StringUtil::Replace(new_line, "\\r", "\r"); - return StringUtil::Replace(new_line, "\\n", "\n"); - ; -} - static vector> CreateCastExpressions(WriteCSVData &bind_data, ClientContext &context, const vector &names, const vector &sql_types) { @@ -184,7 +172,7 @@ static vector> CreateCastExpressions(WriteCSVData &bind_d static unique_ptr WriteCSVBind(ClientContext &context, CopyFunctionBindInput &input, const vector &names, const vector &sql_types) { - auto bind_data = make_uniq(input.info.file_path, sql_types, names); + auto bind_data = make_uniq(names); // check all the options in the copy info for (auto &option : input.info.options) { @@ -217,172 +205,38 @@ static unique_ptr WriteCSVBind(ClientContext &context, CopyFunctio auto expressions = CreateCastExpressions(*bind_data, context, names, sql_types); bind_data->cast_expressions = std::move(expressions); - bind_data->requires_quotes = make_unsafe_uniq_array(256); - memset(bind_data->requires_quotes.get(), 0, sizeof(bool) * 256); - bind_data->requires_quotes['\n'] = true; - bind_data->requires_quotes['\r'] = true; - bind_data->requires_quotes['#'] = true; - bind_data->requires_quotes[NumericCast( - bind_data->options.dialect_options.state_machine_options.delimiter.GetValue()[0])] = true; - bind_data->requires_quotes[NumericCast( - bind_data->options.dialect_options.state_machine_options.quote.GetValue())] = true; - - if (!bind_data->options.write_newline.empty()) { - bind_data->newline = TransformNewLine(bind_data->options.write_newline); - } return std::move(bind_data); } -//===--------------------------------------------------------------------===// -// Helper writing functions -//===--------------------------------------------------------------------===// -static string AddEscapes(char to_be_escaped, const char escape, const string &val) { - idx_t i = 0; - string new_val = ""; - idx_t found = val.find(to_be_escaped); - - while (found != string::npos) { - while (i < found) { - new_val += val[i]; - i++; - } - if (escape != '\0') { - new_val += escape; - found = val.find(to_be_escaped, found + 1); - } - } - while (i < val.length()) { - new_val += val[i]; - i++; - } - return new_val; -} - -static bool RequiresQuotes(WriteCSVData &csv_data, const char *str, idx_t len) { - auto &options = csv_data.options; - // check if the string is equal to the null string - if (len == options.null_str[0].size() && memcmp(str, options.null_str[0].c_str(), len) == 0) { - return true; - } - auto str_data = reinterpret_cast(str); - for (idx_t i = 0; i < len; i++) { - if (csv_data.requires_quotes[str_data[i]]) { - // this byte requires quotes - write a quoted string - return true; - } - } - // no newline, quote or delimiter in the string - // no quoting or escaping necessary - return false; -} - -static void WriteQuotedString(WriteStream &writer, WriteCSVData &csv_data, const char *str, idx_t len, - bool force_quote) { - auto &options = csv_data.options; - if (!force_quote) { - // force quote is disabled: check if we need to add quotes anyway - force_quote = RequiresQuotes(csv_data, str, len); - } - // If a quote is set to none (i.e., null-terminator) we skip the quotation - if (force_quote && options.dialect_options.state_machine_options.quote.GetValue() != '\0') { - // quoting is enabled: we might need to escape things in the string - bool requires_escape = false; - // simple CSV - // do a single loop to check for a quote or escape value - for (idx_t i = 0; i < len; i++) { - if (str[i] == options.dialect_options.state_machine_options.quote.GetValue() || - str[i] == options.dialect_options.state_machine_options.escape.GetValue()) { - requires_escape = true; - break; - } - } - - if (!requires_escape) { - // fast path: no need to escape anything - WriteQuoteOrEscape(writer, options.dialect_options.state_machine_options.quote.GetValue()); - writer.WriteData(const_data_ptr_cast(str), len); - WriteQuoteOrEscape(writer, options.dialect_options.state_machine_options.quote.GetValue()); - return; - } - - // slow path: need to add escapes - string new_val(str, len); - new_val = AddEscapes(options.dialect_options.state_machine_options.escape.GetValue(), - options.dialect_options.state_machine_options.escape.GetValue(), new_val); - if (options.dialect_options.state_machine_options.escape != - options.dialect_options.state_machine_options.quote) { - // need to escape quotes separately - new_val = AddEscapes(options.dialect_options.state_machine_options.quote.GetValue(), - options.dialect_options.state_machine_options.escape.GetValue(), new_val); - } - WriteQuoteOrEscape(writer, options.dialect_options.state_machine_options.quote.GetValue()); - writer.WriteData(const_data_ptr_cast(new_val.c_str()), new_val.size()); - WriteQuoteOrEscape(writer, options.dialect_options.state_machine_options.quote.GetValue()); - } else { - writer.WriteData(const_data_ptr_cast(str), len); - } -} - //===--------------------------------------------------------------------===// // Sink //===--------------------------------------------------------------------===// struct LocalWriteCSVData : public LocalFunctionData { public: LocalWriteCSVData(ClientContext &context, vector> &expressions, const idx_t &flush_size) - : executor(context, expressions), stream(BufferAllocator::Get(context), flush_size) { + : executor(context, expressions), writer_local_state(context, flush_size) { } public: //! Used to execute the expressions that transform input -> string ExpressionExecutor executor; - //! The thread-local buffer to write data into - MemoryStream stream; //! A chunk with VARCHAR columns to cast intermediates into DataChunk cast_chunk; - //! If we've written any rows yet, allows us to prevent a trailing comma when writing JSON ARRAY - bool written_anything = false; + //! Local state for the CSV writer + CSVWriterState writer_local_state; }; struct GlobalWriteCSVData : public GlobalFunctionData { - GlobalWriteCSVData(FileSystem &fs, const string &file_path, FileCompressionType compression) - : fs(fs), written_anything(false) { - handle = fs.OpenFile(file_path, FileFlags::FILE_FLAGS_WRITE | FileFlags::FILE_FLAGS_FILE_CREATE_NEW | - FileLockType::WRITE_LOCK | compression); - } - - //! Write generic data, e.g., CSV header - void WriteData(const_data_ptr_t data, idx_t size) { - lock_guard flock(lock); - handle->Write((void *)data, size); - } - - void WriteData(const char *data, idx_t size) { - WriteData(const_data_ptr_cast(data), size); - } - - //! Write rows - void WriteRows(const_data_ptr_t data, idx_t size, const string &newline) { - lock_guard flock(lock); - if (written_anything) { - handle->Write((void *)newline.c_str(), newline.length()); - } else { - written_anything = true; - } - handle->Write((void *)data, size); + GlobalWriteCSVData(CSVReaderOptions &options, FileSystem &fs, const string &file_path, + FileCompressionType compression) + : writer(options, fs, file_path, compression) { } idx_t FileSize() { - lock_guard flock(lock); - return handle->GetFileSize(); + return writer.FileSize(); } - FileSystem &fs; - //! The mutex for writing to the physical file - mutex lock; - //! The file handle to write to - unique_ptr handle; - //! If we've written any rows yet, allows us to prevent a trailing comma when writing JSON ARRAY - bool written_anything; + CSVWriter writer; }; static unique_ptr WriteCSVInitializeLocal(ExecutionContext &context, FunctionData &bind_data) { @@ -402,36 +256,15 @@ static unique_ptr WriteCSVInitializeGlobal(ClientContext &co auto &csv_data = bind_data.Cast(); auto &options = csv_data.options; auto global_data = - make_uniq(FileSystem::GetFileSystem(context), file_path, options.compression); + make_uniq(options, FileSystem::GetFileSystem(context), file_path, options.compression); - if (!options.prefix.empty()) { - global_data->WriteData(options.prefix.c_str(), options.prefix.size()); - } - - if (!(options.dialect_options.header.IsSetByUser() && !options.dialect_options.header.GetValue())) { - MemoryStream stream(BufferAllocator::Get(context), csv_data.flush_size); - // write the header line to the file - for (idx_t i = 0; i < csv_data.options.name_list.size(); i++) { - if (i != 0) { - WriteQuoteOrEscape(stream, options.dialect_options.state_machine_options.delimiter.GetValue()[0]); - } - WriteQuotedString(stream, csv_data, csv_data.options.name_list[i].c_str(), - csv_data.options.name_list[i].size(), false); - } - stream.WriteData(const_data_ptr_cast(csv_data.newline.c_str()), csv_data.newline.size()); - - global_data->WriteData(stream.GetData(), stream.GetPosition()); - } + global_data->writer.Initialize(); return std::move(global_data); } -static void WriteCSVChunkInternal(ClientContext &context, FunctionData &bind_data, DataChunk &cast_chunk, - MemoryStream &writer, DataChunk &input, bool &written_anything, - ExpressionExecutor &executor) { - auto &csv_data = bind_data.Cast(); - auto &options = csv_data.options; - +static void WriteCSVChunkInternal(CSVWriter &writer, CSVWriterState &writer_local_state, DataChunk &cast_chunk, + DataChunk &input, ExpressionExecutor &executor) { // first cast the columns of the chunk to varchar cast_chunk.Reset(); cast_chunk.SetCardinality(input); @@ -439,53 +272,17 @@ static void WriteCSVChunkInternal(ClientContext &context, FunctionData &bind_dat executor.Execute(input, cast_chunk); cast_chunk.Flatten(); - // now loop over the vectors and output the values - for (idx_t row_idx = 0; row_idx < cast_chunk.size(); row_idx++) { - if (row_idx == 0 && !written_anything) { - written_anything = true; - } else { - writer.WriteData(const_data_ptr_cast(csv_data.newline.c_str()), csv_data.newline.size()); - } - // write values - D_ASSERT(options.null_str.size() == 1); - for (idx_t col_idx = 0; col_idx < cast_chunk.ColumnCount(); col_idx++) { - if (col_idx != 0) { - WriteQuoteOrEscape(writer, options.dialect_options.state_machine_options.delimiter.GetValue()[0]); - } - if (FlatVector::IsNull(cast_chunk.data[col_idx], row_idx)) { - // write null value - writer.WriteData(const_data_ptr_cast(options.null_str[0].c_str()), options.null_str[0].size()); - continue; - } - // non-null value, fetch the string value from the cast chunk - auto str_data = FlatVector::GetData(cast_chunk.data[col_idx]); - // FIXME: we could gain some performance here by checking for certain types if they ever require quotes - // (e.g. integers only require quotes if the delimiter is a number, decimals only require quotes if the - // delimiter is a number or "." character) - WriteQuotedString(writer, csv_data, str_data[row_idx].GetData(), str_data[row_idx].GetSize(), - csv_data.options.force_quote[col_idx]); - } - } + writer.WriteChunk(cast_chunk, writer_local_state); } static void WriteCSVSink(ExecutionContext &context, FunctionData &bind_data, GlobalFunctionData &gstate, LocalFunctionData &lstate, DataChunk &input) { - auto &csv_data = bind_data.Cast(); auto &local_data = lstate.Cast(); auto &global_state = gstate.Cast(); - // write data into the local buffer - WriteCSVChunkInternal(context.client, bind_data, local_data.cast_chunk, local_data.stream, input, - local_data.written_anything, local_data.executor); - - // check if we should flush what we have currently written - auto &writer = local_data.stream; - if (writer.GetPosition() >= csv_data.flush_size) { - global_state.WriteRows(writer.GetData(), writer.GetPosition(), csv_data.newline); - writer.Rewind(); - local_data.written_anything = false; - } + WriteCSVChunkInternal(global_state.writer, local_data.writer_local_state, local_data.cast_chunk, input, + local_data.executor); } //===--------------------------------------------------------------------===// @@ -495,13 +292,7 @@ static void WriteCSVCombine(ExecutionContext &context, FunctionData &bind_data, LocalFunctionData &lstate) { auto &local_data = lstate.Cast(); auto &global_state = gstate.Cast(); - auto &csv_data = bind_data.Cast(); - auto &writer = local_data.stream; - // flush the local writer - if (local_data.written_anything) { - global_state.WriteRows(writer.GetData(), writer.GetPosition(), csv_data.newline); - writer.Rewind(); - } + global_state.writer.Flush(local_data.writer_local_state); } //===--------------------------------------------------------------------===// @@ -512,16 +303,12 @@ void WriteCSVFinalize(ClientContext &context, FunctionData &bind_data, GlobalFun auto &csv_data = bind_data.Cast(); auto &options = csv_data.options; - MemoryStream stream(BufferAllocator::Get(context), csv_data.flush_size); if (!options.suffix.empty()) { - stream.WriteData(const_data_ptr_cast(options.suffix.c_str()), options.suffix.size()); - } else if (global_state.written_anything) { - stream.WriteData(const_data_ptr_cast(csv_data.newline.c_str()), csv_data.newline.size()); + global_state.writer.WriteRawString(options.suffix); + } else if (global_state.writer.WrittenAnything()) { + global_state.writer.WriteRawString(global_state.writer.writer_options.newline); } - global_state.WriteData(stream.GetData(), stream.GetPosition()); - - global_state.handle->Close(); - global_state.handle.reset(); + global_state.writer.Close(); } //===--------------------------------------------------------------------===// @@ -540,11 +327,13 @@ CopyFunctionExecutionMode WriteCSVExecutionMode(bool preserve_insertion_order, b // Prepare Batch //===--------------------------------------------------------------------===// struct WriteCSVBatchData : public PreparedBatchData { - explicit WriteCSVBatchData(Allocator &allocator, const idx_t flush_size) : stream(allocator, flush_size) { + explicit WriteCSVBatchData(ClientContext &context, const idx_t flush_size) + : writer_local_state(make_uniq(context, flush_size)) { + writer_local_state->require_manual_flush = true; } //! The thread-local buffer to write data into - MemoryStream stream; + unique_ptr writer_local_state; }; unique_ptr WriteCSVPrepareBatch(ClientContext &context, FunctionData &bind_data, @@ -561,12 +350,12 @@ unique_ptr WriteCSVPrepareBatch(ClientContext &context, Funct auto &original_types = collection->Types(); auto expressions = CreateCastExpressions(csv_data, context, csv_data.options.name_list, original_types); ExpressionExecutor executor(context, expressions); + auto &global_state = gstate.Cast(); // write CSV chunks to the batch data - bool written_anything = false; - auto batch = make_uniq(BufferAllocator::Get(context), NextPowerOfTwo(collection->SizeInBytes())); + auto batch = make_uniq(context, NextPowerOfTwo(collection->SizeInBytes())); for (auto &chunk : collection->Chunks()) { - WriteCSVChunkInternal(context, bind_data, cast_chunk, batch->stream, chunk, written_anything, executor); + WriteCSVChunkInternal(global_state.writer, *batch->writer_local_state, cast_chunk, chunk, executor); } return std::move(batch); } @@ -578,10 +367,7 @@ void WriteCSVFlushBatch(ClientContext &context, FunctionData &bind_data, GlobalF PreparedBatchData &batch) { auto &csv_batch = batch.Cast(); auto &global_state = gstate.Cast(); - auto &csv_data = bind_data.Cast(); - auto &writer = csv_batch.stream; - global_state.WriteRows(writer.GetData(), writer.GetPosition(), csv_data.newline); - writer.Rewind(); + global_state.writer.Flush(*csv_batch.writer_local_state); } //===--------------------------------------------------------------------===// diff --git a/src/duckdb/src/function/table/system/duckdb_log.cpp b/src/duckdb/src/function/table/system/duckdb_log.cpp index c0bf42975..2be0531c9 100644 --- a/src/duckdb/src/function/table/system/duckdb_log.cpp +++ b/src/duckdb/src/function/table/system/duckdb_log.cpp @@ -11,8 +11,8 @@ namespace duckdb { struct DuckDBLogData : public GlobalTableFunctionState { explicit DuckDBLogData(shared_ptr log_storage_p) : log_storage(std::move(log_storage_p)) { - scan_state = log_storage->CreateScanEntriesState(); - log_storage->InitializeScanEntries(*scan_state); + scan_state = log_storage->CreateScanState(LoggingTargetTable::LOG_ENTRIES); + log_storage->InitializeScan(*scan_state); } DuckDBLogData() : log_storage(nullptr) { } @@ -43,7 +43,7 @@ static unique_ptr DuckDBLogBind(ClientContext &context, TableFunct } unique_ptr DuckDBLogInit(ClientContext &context, TableFunctionInitInput &input) { - if (LogManager::Get(context).CanScan()) { + if (LogManager::Get(context).CanScan(LoggingTargetTable::LOG_ENTRIES)) { return make_uniq(LogManager::Get(context).GetLogStorage()); } return make_uniq(); @@ -52,12 +52,48 @@ unique_ptr DuckDBLogInit(ClientContext &context, Table void DuckDBLogFunction(ClientContext &context, TableFunctionInput &data_p, DataChunk &output) { auto &data = data_p.global_state->Cast(); if (data.log_storage) { - data.log_storage->ScanEntries(*data.scan_state, output); + data.log_storage->Scan(*data.scan_state, output); } } +unique_ptr DuckDBLogBindReplace(ClientContext &context, TableFunctionBindInput &input) { + auto log_storage = LogManager::Get(context).GetLogStorage(); + + bool denormalized_table = false; + auto denormalized_table_setting = input.named_parameters.find("denormalized_table"); + if (denormalized_table_setting != input.named_parameters.end()) { + denormalized_table = denormalized_table_setting->second.GetValue(); + } + + // Without join contexts we simply scan the LOG_ENTRIES tables + if (!denormalized_table) { + auto res = log_storage->BindReplace(context, input, LoggingTargetTable::LOG_ENTRIES); + return res; + } + + // If the storage can bind replace for LoggingTargetTable::ALL_LOGS, we use that since that will be most efficient + auto all_log_scan = log_storage->BindReplace(context, input, LoggingTargetTable::ALL_LOGS); + if (all_log_scan) { + return all_log_scan; + } + + // We cannot scan ALL_LOGS but denormalized_table was requested: we need to inject the join between LOG_ENTRIES and + // LOG_CONTEXTS + string sub_query_string = "SELECT l.context_id, scope, connection_id, transaction_id, query_id, thread_id, " + "timestamp, type, log_level, message" + " FROM (SELECT row_number() OVER () AS rowid, * FROM duckdb_logs()) as l JOIN " + "duckdb_log_contexts() as c ON l.context_id=c.context_id order by timestamp, l.rowid;"; + Parser parser(context.GetParserOptions()); + parser.ParseQuery(sub_query_string); + auto select_stmt = unique_ptr_cast(std::move(parser.statements[0])); + + return duckdb::make_uniq(std::move(select_stmt)); +} + void DuckDBLogFun::RegisterFunction(BuiltinFunctions &set) { TableFunction logs_fun("duckdb_logs", {}, DuckDBLogFunction, DuckDBLogBind, DuckDBLogInit); + logs_fun.bind_replace = DuckDBLogBindReplace; + logs_fun.named_parameters["denormalized_table"] = LogicalType::BOOLEAN; set.AddFunction(logs_fun); } diff --git a/src/duckdb/src/function/table/system/duckdb_log_contexts.cpp b/src/duckdb/src/function/table/system/duckdb_log_contexts.cpp index fa227551f..c3ec4f39a 100644 --- a/src/duckdb/src/function/table/system/duckdb_log_contexts.cpp +++ b/src/duckdb/src/function/table/system/duckdb_log_contexts.cpp @@ -12,8 +12,8 @@ namespace duckdb { struct DuckDBLogContextData : public GlobalTableFunctionState { explicit DuckDBLogContextData(shared_ptr log_storage_p) : log_storage(std::move(log_storage_p)) { - scan_state = log_storage->CreateScanContextsState(); - log_storage->InitializeScanContexts(*scan_state); + scan_state = log_storage->CreateScanState(LoggingTargetTable::LOG_CONTEXTS); + log_storage->InitializeScan(*scan_state); } DuckDBLogContextData() : log_storage(nullptr) { } @@ -47,7 +47,7 @@ static unique_ptr DuckDBLogContextBind(ClientContext &context, Tab } unique_ptr DuckDBLogContextInit(ClientContext &context, TableFunctionInitInput &input) { - if (LogManager::Get(context).CanScan()) { + if (LogManager::Get(context).CanScan(LoggingTargetTable::LOG_CONTEXTS)) { return make_uniq(LogManager::Get(context).GetLogStorage()); } return make_uniq(); @@ -56,13 +56,22 @@ unique_ptr DuckDBLogContextInit(ClientContext &context void DuckDBLogContextFunction(ClientContext &context, TableFunctionInput &data_p, DataChunk &output) { auto &data = data_p.global_state->Cast(); if (data.log_storage) { - data.log_storage->ScanContexts(*data.scan_state, output); + data.log_storage->Scan(*data.scan_state, output); } } +static unique_ptr DuckDBLogContextsBindReplace(ClientContext &context, TableFunctionBindInput &input) { + auto log_storage = LogManager::Get(context).GetLogStorage(); + + // Attempt to let the storage BindReplace the scan function + return log_storage->BindReplace(context, input, LoggingTargetTable::LOG_CONTEXTS); +} + void DuckDBLogContextFun::RegisterFunction(BuiltinFunctions &set) { - set.AddFunction( - TableFunction("duckdb_log_contexts", {}, DuckDBLogContextFunction, DuckDBLogContextBind, DuckDBLogContextInit)); + auto fun = + TableFunction("duckdb_log_contexts", {}, DuckDBLogContextFunction, DuckDBLogContextBind, DuckDBLogContextInit); + fun.bind_replace = DuckDBLogContextsBindReplace; + set.AddFunction(fun); } } // namespace duckdb diff --git a/src/duckdb/src/function/table/system/logging_utils.cpp b/src/duckdb/src/function/table/system/logging_utils.cpp new file mode 100644 index 000000000..c33842ae4 --- /dev/null +++ b/src/duckdb/src/function/table/system/logging_utils.cpp @@ -0,0 +1,151 @@ +#include "duckdb/function/table/system_functions.hpp" + +#include "duckdb/catalog/catalog_entry/collate_catalog_entry.hpp" +#include "duckdb/common/exception.hpp" +#include "duckdb/function/table_function.hpp" +#include "duckdb/logging/log_manager.hpp" +#include "duckdb/logging/logging.hpp" +#include "duckdb/main/client_context.hpp" +#include "duckdb/main/database.hpp" + +namespace duckdb { + +class EnableLoggingBindData : public TableFunctionData { +public: + EnableLoggingBindData() { + } + + case_insensitive_map_t storage_config; + LogConfig config; + vector log_types_to_set; +}; + +static void EnableLogging(ClientContext &context, TableFunctionInput &data, DataChunk &output) { + auto bind_data = data.bind_data->Cast(); + + auto &log_manager = context.db->GetLogManager(); + + // Apply the config generated from the input + log_manager.SetConfig(*context.db, bind_data.config); + + if (bind_data.log_types_to_set.empty()) { + log_manager.SetEnableLogging(true); + log_manager.SetLogMode(LogMode::LEVEL_ONLY); + } else { + log_manager.SetEnableStructuredLoggers(bind_data.log_types_to_set); + } + + if (!bind_data.storage_config.empty()) { + log_manager.UpdateLogStorageConfig(*context.db, bind_data.storage_config); + } +} + +static unique_ptr BindEnableLogging(ClientContext &context, TableFunctionBindInput &input, + vector &return_types, vector &names) { + if (input.inputs.size() > 1) { + throw InvalidInputException("PragmaEnableLogging: expected 0 or 1 parameter"); + } + + auto result = make_uniq(); + + bool storage_isset = false; + bool storage_path_isset = false; + + for (const auto ¶m : input.named_parameters) { + auto key = StringUtil::Lower(param.first); + if (key == "level") { + result->config.level = EnumUtil::FromString(param.second.ToString()); + } else if (key == "storage") { + storage_isset = true; + result->config.storage = param.second.ToString(); + } else if (key == "storage_config") { + auto &children = StructValue::GetChildren(param.second); + for (idx_t i = 0; i < children.size(); i++) { + result->storage_config[StructType::GetChildName(param.second.type(), i)] = children[i]; + } + } else if (key == "storage_path") { + storage_path_isset = true; + result->storage_config["path"] = param.second; + } else if (key == "storage_normalize") { + result->storage_config["normalize"] = param.second; + } else if (key == "storage_buffer_size") { + result->storage_config["buffer_size"] = param.second; + } else { + throw InvalidInputException("PragmaEnableLogging: unknown named parameter: %s", param.first.c_str()); + } + } + + // This will implicitly set the log storage if the storage_path param is set and the storage is omitted + if (!storage_isset && storage_path_isset) { + result->config.storage = LogConfig::FILE_STORAGE_NAME; + } + + // Process positional params + if (!input.inputs.empty()) { + if (input.inputs[0].type() == LogicalType::VARCHAR) { + result->log_types_to_set.push_back(input.inputs[0].GetValue()); + } else if (input.inputs[0].type() == LogicalType::LIST(LogicalType::VARCHAR)) { + for (const auto &child : ListValue::GetChildren(input.inputs[0])) { + result->log_types_to_set.push_back(child.GetValue()); + } + } else { + throw BinderException("Unexpected type positional parameter to enable_logging"); + } + } + + return_types.emplace_back(LogicalType::BOOLEAN); + names.emplace_back("Success"); + + return std::move(result); +} + +//! Reset the logmanager to the defaults +static void DisableLogging(ClientContext &context, TableFunctionInput &data, DataChunk &output) { + context.db->GetLogManager().SetEnableLogging(false); +} + +//! Truncate the current log storage +static void TruncateLogs(ClientContext &context, TableFunctionInput &data, DataChunk &output) { + context.db->GetLogManager().TruncateLogStorage(); +} + +static unique_ptr BindDisableLogging(ClientContext &context, TableFunctionBindInput &input, + vector &return_types, vector &names) { + return_types.emplace_back(LogicalType::BOOLEAN); + names.emplace_back("Success"); + + return std::move(make_uniq()); +} + +static unique_ptr BindTruncateLogs(ClientContext &context, TableFunctionBindInput &input, + vector &return_types, vector &names) { + return_types.emplace_back(LogicalType::BOOLEAN); + names.emplace_back("Success"); + + return make_uniq(); +} + +void EnableLoggingFun::RegisterFunction(BuiltinFunctions &set) { + auto enable_fun = TableFunction("enable_logging", {}, EnableLogging, BindEnableLogging, nullptr, nullptr); + + // Base config + enable_fun.named_parameters.emplace("level", LogicalType::VARCHAR); + enable_fun.named_parameters.emplace("storage", LogicalType::VARCHAR); + enable_fun.named_parameters.emplace("storage_config", LogicalType::ANY); + + // Config that is forwarded to the storage_config struct as syntactic sugar + enable_fun.named_parameters.emplace("storage_path", LogicalType::VARCHAR); + enable_fun.named_parameters.emplace("storage_normalize", LogicalType::BOOLEAN); + enable_fun.named_parameters.emplace("storage_buffer_size", LogicalType::UBIGINT); + + enable_fun.varargs = LogicalType::ANY; + set.AddFunction(enable_fun); + + auto disable_fun = TableFunction("disable_logging", {}, DisableLogging, BindDisableLogging, nullptr, nullptr); + set.AddFunction(disable_fun); + + auto truncate_fun = TableFunction("truncate_duckdb_logs", {}, TruncateLogs, BindTruncateLogs, nullptr, nullptr); + set.AddFunction(truncate_fun); +} + +} // namespace duckdb diff --git a/src/duckdb/src/function/table/system_functions.cpp b/src/duckdb/src/function/table/system_functions.cpp index 9f1561b22..d10ec5d31 100644 --- a/src/duckdb/src/function/table/system_functions.cpp +++ b/src/duckdb/src/function/table/system_functions.cpp @@ -45,6 +45,7 @@ void BuiltinFunctions::RegisterSQLiteFunctions() { DuckDBTypesFun::RegisterFunction(*this); DuckDBVariablesFun::RegisterFunction(*this); DuckDBViewsFun::RegisterFunction(*this); + EnableLoggingFun::RegisterFunction(*this); TestAllTypesFun::RegisterFunction(*this); TestVectorTypesFun::RegisterFunction(*this); } diff --git a/src/duckdb/src/function/table/version/pragma_version.cpp b/src/duckdb/src/function/table/version/pragma_version.cpp index 44ca40fc2..108d02ff2 100644 --- a/src/duckdb/src/function/table/version/pragma_version.cpp +++ b/src/duckdb/src/function/table/version/pragma_version.cpp @@ -1,5 +1,5 @@ #ifndef DUCKDB_PATCH_VERSION -#define DUCKDB_PATCH_VERSION "0-dev3214" +#define DUCKDB_PATCH_VERSION "0-dev3285" #endif #ifndef DUCKDB_MINOR_VERSION #define DUCKDB_MINOR_VERSION 4 @@ -8,10 +8,10 @@ #define DUCKDB_MAJOR_VERSION 1 #endif #ifndef DUCKDB_VERSION -#define DUCKDB_VERSION "v1.4.0-dev3214" +#define DUCKDB_VERSION "v1.4.0-dev3285" #endif #ifndef DUCKDB_SOURCE_ID -#define DUCKDB_SOURCE_ID "b354bfeff4" +#define DUCKDB_SOURCE_ID "fa13060a63" #endif #include "duckdb/function/table/system_functions.hpp" #include "duckdb/main/database.hpp" diff --git a/src/duckdb/src/include/duckdb/catalog/catalog.hpp b/src/duckdb/src/include/duckdb/catalog/catalog.hpp index 07960fcf3..ac733fb6b 100644 --- a/src/duckdb/src/include/duckdb/catalog/catalog.hpp +++ b/src/duckdb/src/include/duckdb/catalog/catalog.hpp @@ -407,15 +407,16 @@ class Catalog { CatalogEntryLookup TryLookupEntry(CatalogEntryRetriever &retriever, const string &schema, const EntryLookupInfo &lookup_info, OnEntryNotFound if_not_found); static CatalogEntryLookup TryLookupEntry(CatalogEntryRetriever &retriever, const vector &lookups, - const EntryLookupInfo &lookup_info, OnEntryNotFound if_not_found); + const EntryLookupInfo &lookup_info, OnEntryNotFound if_not_found, + bool allow_default_table_lookup); static CatalogEntryLookup TryLookupEntry(CatalogEntryRetriever &retriever, const string &catalog, const string &schema, const EntryLookupInfo &lookup_info, OnEntryNotFound if_not_found); //! Looks for a Catalog with a DefaultTable that matches the lookup - static CatalogEntryLookup TryLookupDefaultTable(CatalogEntryRetriever &retriever, const string &catalog, - const string &schema, const EntryLookupInfo &lookup_info, - OnEntryNotFound if_not_found); + static CatalogEntryLookup TryLookupDefaultTable(CatalogEntryRetriever &retriever, + const EntryLookupInfo &lookup_info, + bool allow_ignore_at_clause = false); //! Return an exception with did-you-mean suggestion. static CatalogException CreateMissingEntryException(CatalogEntryRetriever &retriever, diff --git a/src/duckdb/src/include/duckdb/common/csv_writer.hpp b/src/duckdb/src/include/duckdb/common/csv_writer.hpp new file mode 100644 index 000000000..b2d0e066e --- /dev/null +++ b/src/duckdb/src/include/duckdb/common/csv_writer.hpp @@ -0,0 +1,153 @@ +//===----------------------------------------------------------------------===// +// DuckDB +// +// duckdb/common/csv_writer.hpp +// +// +//===----------------------------------------------------------------------===// + +#pragma once + +#include "duckdb/common/common.hpp" +#include "duckdb/execution/operator/csv_scanner/csv_reader_options.hpp" +#include "duckdb/common/serializer/memory_stream.hpp" + +namespace duckdb { +class MemoryStream; + +enum class CSVNewLineMode { + //! Newlines are written before writing out a new csv line. Ensures we don't write an empty line at the end of + //! a csv file. Uses CSVWriterState::written_anything to keep track of when to insert newlines + WRITE_BEFORE = 0, + //! Newlines are written after every line. This is cleanest in stdout, where lines are expected to end with a + //! newline + WRITE_AFTER = 1 +}; + +struct CSVWriterOptions { + CSVWriterOptions(const string &delim, const char "e, const string &write_newline); + explicit CSVWriterOptions(CSVReaderOptions &options); + + //! The newline string to write + string newline = "\n"; + //! The size of the CSV file (in bytes) that we buffer before we flush it to disk + idx_t flush_size = 4096ULL * 8ULL; + //! For each byte whether the CSV file requires quotes when containing the byte + vector requires_quotes; + //! How to write newlines + CSVNewLineMode newline_writing_mode = CSVNewLineMode::WRITE_BEFORE; +}; + +struct CSVWriterState { + CSVWriterState(ClientContext &context, idx_t flush_size); + CSVWriterState(DatabaseInstance &db, idx_t flush_size); + CSVWriterState(); + ~CSVWriterState(); + + void Reset() { + stream->Rewind(); + written_anything = false; + } + + idx_t flush_size; + unique_ptr stream; + bool written_anything = false; + + bool require_manual_flush = false; +}; + +class CSVWriter { +public: + //! Create a CSVWriter that writes to a (non-owned) WriteStream + CSVWriter(WriteStream &stream, vector name_list, bool shared = true); + + //! Create a CSVWriter that writes to a file + CSVWriter(CSVReaderOptions &options, FileSystem &fs, const string &file_path, FileCompressionType compression, + bool shared = true); + + //! Writes header and prefix if necessary + void Initialize(bool force = false); + + //! Writes the raw string directly into the output stream + void WriteRawString(const string &data); + //! Writes the header directly into the output stream + void WriteHeader(); + //! Write the Raw String, using the local_state + void WriteRawString(const string &prefix, CSVWriterState &local_state); + //! Write a chunk of VARCHAR vectors to the CSV file (any casts are the responsibility of caller) + void WriteChunk(DataChunk &input, CSVWriterState &local_state); + //! (Non-shared only) variant of WriteChunk + void WriteChunk(DataChunk &input); + + //! Flushes all data in the local write state + void Flush(CSVWriterState &local_state); + //! (Non-shared only) variant of Flush + void Flush(); + + //! Resets the state of the writer. Warning: the file_writer is not reset + void Reset(optional_ptr local_state); + + //! Closes the writer, optionally writes a postfix + void Close(); + + unique_ptr InitializeLocalWriteState(ClientContext &context, idx_t flush_size); + unique_ptr InitializeLocalWriteState(DatabaseInstance &db, idx_t flush_size); + + vector> string_casts; + + idx_t BytesWritten(); + + //! BytesWritten + OriginalSize; + idx_t FileSize(); + + bool WrittenAnything() { + return written_anything; + } + void SetWrittenAnything(bool val) { + if (shared) { + lock_guard guard(lock); + written_anything = val; + } else { + written_anything = val; + } + } + + CSVReaderOptions options; + CSVWriterOptions writer_options; + +protected: + void FlushInternal(CSVWriterState &local_state); + void ResetInternal(optional_ptr local_state); + + //! If we've written any rows yet, allows us to prevent a trailing comma when writing JSON ARRAY + bool written_anything = false; + + //! (optional) The owned file writer of this CSVWriter + unique_ptr file_writer; + + //! The WriteStream to write the CSV data to + WriteStream &write_stream; + + idx_t bytes_written = 0; + + bool should_initialize; + + mutex lock; + bool shared; + + unique_ptr global_write_state; + + static void WriteQuoteOrEscape(WriteStream &writer, char quote_or_escape); + static string AddEscapes(char to_be_escaped, char escape, const string &val); + static bool RequiresQuotes(const char *str, idx_t len, const string &null_str, const vector &requires_quotes); + static void WriteQuotedString(WriteStream &writer, const char *str, idx_t len, bool force_quote, + const string &null_str, const vector &requires_quotes, char quote, char escape); + static void WriteQuotedString(WriteStream &writer, const char *str, idx_t len, idx_t col_idx, + CSVReaderOptions &options, CSVWriterOptions &writer_options); + + static void WriteChunk(DataChunk &input, MemoryStream &writer, CSVReaderOptions &options, bool &written_anything, + CSVWriterOptions &writer_options); + static void WriteHeader(MemoryStream &stream, CSVReaderOptions &options, CSVWriterOptions &writer_options); +}; + +} // namespace duckdb diff --git a/src/duckdb/src/include/duckdb/common/file_open_flags.hpp b/src/duckdb/src/include/duckdb/common/file_open_flags.hpp index 4dd14af80..8c1c52812 100644 --- a/src/duckdb/src/include/duckdb/common/file_open_flags.hpp +++ b/src/duckdb/src/include/duckdb/common/file_open_flags.hpp @@ -29,6 +29,7 @@ class FileOpenFlags { static constexpr idx_t FILE_FLAGS_EXCLUSIVE_CREATE = idx_t(1 << 9); static constexpr idx_t FILE_FLAGS_NULL_IF_EXISTS = idx_t(1 << 10); static constexpr idx_t FILE_FLAGS_MULTI_CLIENT_ACCESS = idx_t(1 << 11); + static constexpr idx_t FILE_FLAGS_DISABLE_LOGGING = idx_t(1 << 12); public: FileOpenFlags() = default; @@ -111,6 +112,9 @@ class FileOpenFlags { inline bool MultiClientAccess() const { return flags & FILE_FLAGS_MULTI_CLIENT_ACCESS; } + inline bool DisableLogging() const { + return flags & FILE_FLAGS_DISABLE_LOGGING; + } inline idx_t GetFlagsInternal() const { return flags; } @@ -152,6 +156,9 @@ class FileFlags { //! Multiple clients may access the file at the same time static constexpr FileOpenFlags FILE_FLAGS_MULTI_CLIENT_ACCESS = FileOpenFlags(FileOpenFlags::FILE_FLAGS_MULTI_CLIENT_ACCESS); + //! Disables logging to avoid infinite loops when using FileHandle-backed log storage + static constexpr FileOpenFlags FILE_FLAGS_DISABLE_LOGGING = + FileOpenFlags(FileOpenFlags::FILE_FLAGS_DISABLE_LOGGING); }; } // namespace duckdb diff --git a/src/duckdb/src/include/duckdb/common/file_system.hpp b/src/duckdb/src/include/duckdb/common/file_system.hpp index 3489c5875..b8e05bc4b 100644 --- a/src/duckdb/src/include/duckdb/common/file_system.hpp +++ b/src/duckdb/src/include/duckdb/common/file_system.hpp @@ -279,6 +279,7 @@ class FileSystem { DUCKDB_API static bool IsRemoteFile(const string &path, string &extension); DUCKDB_API virtual void SetDisabledFileSystems(const vector &names); + DUCKDB_API virtual bool SubSystemIsDisabled(const string &name); DUCKDB_API static bool IsDirectory(const OpenFileInfo &info); diff --git a/src/duckdb/src/include/duckdb/common/opener_file_system.hpp b/src/duckdb/src/include/duckdb/common/opener_file_system.hpp index 13032a5dc..519c2bc71 100644 --- a/src/duckdb/src/include/duckdb/common/opener_file_system.hpp +++ b/src/duckdb/src/include/duckdb/common/opener_file_system.hpp @@ -143,6 +143,10 @@ class OpenerFileSystem : public FileSystem { GetFileSystem().SetDisabledFileSystems(names); } + bool SubSystemIsDisabled(const string &name) override { + return GetFileSystem().SubSystemIsDisabled(name); + } + vector ListSubSystems() override { return GetFileSystem().ListSubSystems(); } diff --git a/src/duckdb/src/include/duckdb/common/virtual_file_system.hpp b/src/duckdb/src/include/duckdb/common/virtual_file_system.hpp index 30969f93d..6a0f0346a 100644 --- a/src/duckdb/src/include/duckdb/common/virtual_file_system.hpp +++ b/src/duckdb/src/include/duckdb/common/virtual_file_system.hpp @@ -63,6 +63,7 @@ class VirtualFileSystem : public FileSystem { std::string GetName() const override; void SetDisabledFileSystems(const vector &names) override; + bool SubSystemIsDisabled(const string &name) override; string PathSeparator(const string &path) override; diff --git a/src/duckdb/src/include/duckdb/function/table/read_csv.hpp b/src/duckdb/src/include/duckdb/function/table/read_csv.hpp index 523ee8b07..62a7f0af8 100644 --- a/src/duckdb/src/include/duckdb/function/table/read_csv.hpp +++ b/src/duckdb/src/include/duckdb/function/table/read_csv.hpp @@ -18,6 +18,7 @@ #include "duckdb/function/scalar/strftime_format.hpp" #include "duckdb/function/table_function.hpp" #include "duckdb/execution/operator/csv_scanner/csv_file_scanner.hpp" +#include "duckdb/common/csv_writer.hpp" namespace duckdb { class BaseScanner; @@ -40,25 +41,14 @@ struct BaseCSVData : public TableFunctionData { }; struct WriteCSVData : public BaseCSVData { - WriteCSVData(string file_path, vector sql_types, vector names) - : sql_types(std::move(sql_types)) { - files.push_back(std::move(file_path)); + explicit WriteCSVData(vector names) { options.name_list = std::move(names); if (options.dialect_options.state_machine_options.escape == '\0') { options.dialect_options.state_machine_options.escape = options.dialect_options.state_machine_options.quote; } } - - //! The file path of the CSV file to read or write - vector files; - //! The SQL types to write - vector sql_types; - //! The newline string to write - string newline = "\n"; //! The size of the CSV file (in bytes) that we buffer before we flush it to disk idx_t flush_size = 4096ULL * 8ULL; - //! For each byte whether the CSV file requires quotes when containing the byte - unsafe_unique_array requires_quotes; //! Expressions used to convert the input into strings vector> cast_expressions; }; diff --git a/src/duckdb/src/include/duckdb/function/table/system_functions.hpp b/src/duckdb/src/include/duckdb/function/table/system_functions.hpp index 624fcabc2..e325b2f46 100644 --- a/src/duckdb/src/include/duckdb/function/table/system_functions.hpp +++ b/src/duckdb/src/include/duckdb/function/table/system_functions.hpp @@ -151,6 +151,10 @@ struct DuckDBViewsFun { static void RegisterFunction(BuiltinFunctions &set); }; +struct EnableLoggingFun { + static void RegisterFunction(BuiltinFunctions &set); +}; + struct TestType { TestType(LogicalType type_p, string name_p) : type(std::move(type_p)), name(std::move(name_p)), min_value(Value::MinimumValue(type)), diff --git a/src/duckdb/src/include/duckdb/logging/log_manager.hpp b/src/duckdb/src/include/duckdb/logging/log_manager.hpp index 252721d5f..3fd0bce25 100644 --- a/src/duckdb/src/include/duckdb/logging/log_manager.hpp +++ b/src/duckdb/src/include/duckdb/logging/log_manager.hpp @@ -9,6 +9,7 @@ #pragma once #include "duckdb/logging/logger.hpp" +#include "duckdb/logging/log_storage.hpp" #include "duckdb/common/types/timestamp.hpp" #include "duckdb/common/case_insensitive_map.hpp" @@ -47,15 +48,18 @@ class LogManager : public enable_shared_from_this { //! Get a shared_ptr to the log storage (For example, to scan it) DUCKDB_API shared_ptr GetLogStorage(); - DUCKDB_API bool CanScan(); + DUCKDB_API bool CanScan(LoggingTargetTable table); + DUCKDB_API void SetConfig(DatabaseInstance &db, LogConfig config); DUCKDB_API void SetEnableLogging(bool enable); DUCKDB_API void SetLogMode(LogMode mode); DUCKDB_API void SetLogLevel(LogLevel level); - DUCKDB_API void SetEnabledLogTypes(unordered_set &enabled_log_types); - DUCKDB_API void SetDisabledLogTypes(unordered_set &disabled_log_types); + DUCKDB_API void SetEnabledLogTypes(optional_ptr> enabled_log_types); + DUCKDB_API void SetDisabledLogTypes(optional_ptr> disabled_log_types); DUCKDB_API void SetLogStorage(DatabaseInstance &db, const string &storage_name); + DUCKDB_API void UpdateLogStorageConfig(DatabaseInstance &db, case_insensitive_map_t &config_value); + DUCKDB_API void SetEnableStructuredLoggers(vector &enabled_logger_types); DUCKDB_API void TruncateLogStorage(); @@ -75,6 +79,8 @@ class LogManager : public enable_shared_from_this { // This allows efficiently pushing a cached set of log entries into the log manager void FlushCachedLogEntries(DataChunk &chunk, const RegisteredLoggingContext &context); + void SetLogStorageInternal(DatabaseInstance &db, const string &storage_name); + optional_ptr LookupLogTypeInternal(const string &type); mutex lock; diff --git a/src/duckdb/src/include/duckdb/logging/log_storage.hpp b/src/duckdb/src/include/duckdb/logging/log_storage.hpp index d30d370a7..3b6d0960d 100644 --- a/src/duckdb/src/include/duckdb/logging/log_storage.hpp +++ b/src/duckdb/src/include/duckdb/logging/log_storage.hpp @@ -13,6 +13,8 @@ #include "duckdb/common/optional_idx.hpp" #include "duckdb/common/types.hpp" #include "duckdb/common/unordered_set.hpp" +#include "duckdb/common/serializer/write_stream.hpp" +#include "duckdb/common/serializer/buffered_file_writer.hpp" #include "duckdb/common/types/column/column_data_scan_states.hpp" #include "duckdb/parallel/thread_context.hpp" @@ -21,9 +23,26 @@ namespace duckdb { struct RegisteredLoggingContext; class ColumnDataCollection; struct ColumnDataScanState; +class MemoryStream; +struct LogStorageConfig; +class CSVWriter; +struct CSVWriterState; +class BufferedFileWriter; +struct CSVWriterOptions; +struct CSVReaderOptions; + +//! Logging storage can store entries normalized or denormalized. This enum describes what a single table/file/etc +//! contains +enum class LoggingTargetTable { + ALL_LOGS, // Denormalized: log entries consisting of both the full log entry and the context + LOG_ENTRIES, // Normalized: contains only the log entries and a context_id + LOG_CONTEXTS, // Normalized: contains only the log contexts +}; class LogStorageScanState { public: + explicit LogStorageScanState(LoggingTargetTable table_p) : table(table_p) { + } virtual ~LogStorageScanState() = default; template @@ -36,99 +55,299 @@ class LogStorageScanState { DynamicCastCheck(this); return reinterpret_cast(*this); } + + LoggingTargetTable table; }; -// Interface for writing log entries +// Interface for Log Storage class LogStorage { public: DUCKDB_API explicit LogStorage() { } DUCKDB_API virtual ~LogStorage() = default; + virtual const string GetStorageName() = 0; + + static vector GetSchema(LoggingTargetTable table); + static vector GetColumnNames(LoggingTargetTable table); + //! WRITING DUCKDB_API virtual void WriteLogEntry(timestamp_t timestamp, LogLevel level, const string &log_type, const string &log_message, const RegisteredLoggingContext &context) = 0; DUCKDB_API virtual void WriteLogEntries(DataChunk &chunk, const RegisteredLoggingContext &context) = 0; - DUCKDB_API virtual void Flush() = 0; + DUCKDB_API virtual void FlushAll() = 0; + DUCKDB_API virtual void Flush(LoggingTargetTable table) = 0; + DUCKDB_API virtual void Truncate(); + DUCKDB_API virtual bool IsEnabled(LoggingTargetTable table) = 0; //! READING (OPTIONAL) - DUCKDB_API virtual bool CanScan() { + DUCKDB_API virtual bool CanScan(LoggingTargetTable table) { return false; } - DUCKDB_API virtual unique_ptr CreateScanEntriesState() const; - DUCKDB_API virtual bool ScanEntries(LogStorageScanState &state, DataChunk &result) const; - DUCKDB_API virtual void InitializeScanEntries(LogStorageScanState &state) const; - DUCKDB_API virtual unique_ptr CreateScanContextsState() const; - DUCKDB_API virtual bool ScanContexts(LogStorageScanState &state, DataChunk &result) const; - DUCKDB_API virtual void InitializeScanContexts(LogStorageScanState &state) const; + // Reading interface 1: basic single-threaded scan + DUCKDB_API virtual unique_ptr CreateScanState(LoggingTargetTable table) const; + DUCKDB_API virtual bool Scan(LogStorageScanState &state, DataChunk &result) const; + DUCKDB_API virtual void InitializeScan(LogStorageScanState &state) const; - DUCKDB_API virtual void Truncate(); + // Reading interface 2: using bind_replace + DUCKDB_API virtual unique_ptr BindReplace(ClientContext &context, TableFunctionBindInput &input, + LoggingTargetTable table); + + //! CONFIGURATION + DUCKDB_API virtual void UpdateConfig(DatabaseInstance &db, case_insensitive_map_t &config); }; -class StdOutLogStorage : public LogStorage { +//! The buffering Log storage implements a buffering mechanism around the Base LogStorage class. It implements some +//! general features that most log storages will need. +class BufferingLogStorage : public LogStorage { public: - explicit StdOutLogStorage(); - ~StdOutLogStorage() override; + explicit BufferingLogStorage(DatabaseInstance &db_p, idx_t buffer_size, bool normalize); + ~BufferingLogStorage() override; + + /// (Partially) Implements LogStorage API - //! LogStorage API: WRITING + //! Write out the entry to the buffers void WriteLogEntry(timestamp_t timestamp, LogLevel level, const string &log_type, const string &log_message, - const RegisteredLoggingContext &context) override; - void WriteLogEntries(DataChunk &chunk, const RegisteredLoggingContext &context) override; - void Flush() override; + const RegisteredLoggingContext &context) final; + //! Write out the chunk to the buffers + void WriteLogEntries(DataChunk &chunk, const RegisteredLoggingContext &context) final; + //! Flushes buffers for all tables + void FlushAll() final; + //! Flushes buffer for a specific table + void Flush(LoggingTargetTable table) final; + //! Truncates log storage: both buffers and persistent storage (if applicable) + void Truncate() override; + //! Apply a new log storage configuration + void UpdateConfig(DatabaseInstance &db, case_insensitive_map_t &config) override; + //! Returns whether the table is enabled for this storage + bool IsEnabled(LoggingTargetTable table) override; + +protected: + /// Interface to child classes + + //! Invoked whenever buffers are full to flush to storage + virtual void FlushChunk(LoggingTargetTable table, DataChunk &chunk) = 0; + //! This method is called in a chained way down the class hierarchy. This allows each class to interpret its own + //! part of the config. Unhandled config values that are left over will result in an error + virtual void UpdateConfigInternal(DatabaseInstance &db, case_insensitive_map_t &config); + //! ResetAllBuffers will clear all buffered data. To be overridden by child classes to ensure their buffers are + //! flushed too + virtual void ResetAllBuffers(); + + /// Helper methods + + //! Flushes all tables + void FlushAllInternal(); + //! Flushes one of the tables + void FlushInternal(LoggingTargetTable table); + //! Whether a specific table is available in the log storage + bool IsEnabledInternal(LoggingTargetTable table); + + //! lock to be used by this class and child classes to ensure thread safety TODO: maybe remove and delegate + //! thread-safety to LogManager? + mutable mutex lock; + //! Switches between using false = use LoggingTargetTable::ALL_LOGS, true = use LoggingTargetTable::LOG_ENTIRES + + //! LoggingTargetTable::CONTEXTS + bool normalize_contexts = true; + +private: + //! Resets the log buffers + void ResetLogBuffers(); + //! Write out a logging context + void WriteLoggingContext(const RegisteredLoggingContext &context); + + //! The currently registered RegisteredLoggingContext's + unordered_set registered_contexts; + //! Configuration for buffering + idx_t buffer_limit = 0; + //! Debug option for testing buffering behaviour + bool only_flush_on_full_buffer = false; + //! The buffers used for each table + unordered_map> buffers; + //! This flag is set whenever a new context_is written to the entry buffer. It means that the next flush of + //! LoggingTargetTable::LOG_ENTRIES also requires a flush of LoggingTargetTable::LOG_CONTEXTS + bool flush_contexts_on_next_entry_flush = false; +}; + +//! The CSVLogStorage implements an additional layer on the BufferingLogStorage which will handle converting the log +//! entries and contexts to CSV lines. It provides functionality to write log data in CSV format with automatic type +//! casting and configuration of CSV writers. This class serves as a base for both file-based and stdout-based CSV +//! logging. +class CSVLogStorage : public BufferingLogStorage { +public: + explicit CSVLogStorage(DatabaseInstance &db, bool normalize, idx_t buffer_size); + ~CSVLogStorage() override; + +protected: + /// Implement the BufferingLogStorage interface + + //! Flushes the Chunk to the CSV writers + void FlushChunk(LoggingTargetTable table, DataChunk &chunk) final; + //! Resets all buffers and state + void ResetAllBuffers() override; + //! Implements CSVLogStorage specific config handling + void UpdateConfigInternal(DatabaseInstance &db, case_insensitive_map_t &config) override; + + /// Interface to child classes + + //! Hooks for child class to run code pre-flush + virtual void BeforeFlush(LoggingTargetTable table, DataChunk &chunk) {}; + virtual void AfterFlush(LoggingTargetTable table, DataChunk &chunk) {}; + + /// Helper functions + + //! To be called by child classed to register the CSV writers used to write to. + void RegisterWriter(LoggingTargetTable table, unique_ptr writer); + //! Returns the writer for a table + CSVWriter &GetWriter(LoggingTargetTable table); + //! Configure a CSV writer by initializing its settings with the `writer_options` and `reader_options` settings + void SetWriterConfigs(CSVWriter &Writer, vector column_names); + //! Allows child classes to manipulate options + CSVWriterOptions &GetCSVWriterOptions(); + //! Allows child classes to manipulate options + CSVReaderOptions &GetCSVReaderOptions(); + +private: + //! Perform the cast (does not reset input chunk!) + void ExecuteCast(LoggingTargetTable table, DataChunk &chunk); + //! Reset the Cast chunks + void ResetCastChunk(); + //! Initialize the cast chunks + void InitializeCastChunk(LoggingTargetTable table); + + //! The cast buffers used to cast from the original types to the VARCHAR types ready to write to CSV format + unordered_map> cast_buffers; + //! The writers to be registered by child classes + unordered_map> writers; + //! CSV Options to initialize the CSVWriters with. TODO: cleanup, this is now a little bit of a mixed bag of + //! settings + unique_ptr writer_options; + unique_ptr reader_options; +}; + +//! Implements a stdout-based log storage using log lines written in CSV format to allow for easy parsing of the log +//! messages Note that this only supports denormalized logging since there is only 1 practical output stream. +class StdOutLogStorage : public CSVLogStorage { +public: + explicit StdOutLogStorage(DatabaseInstance &db); + ~StdOutLogStorage() override; + + const string GetStorageName() override { + return "StdOutLogStorage"; + } + +private: + class StdOutWriteStream : public WriteStream { + void WriteData(const_data_ptr_t buffer, idx_t write_size) override; + }; + + StdOutWriteStream stdout_stream; +}; + +//! FileLogStorage implements a file-based logging system in CSV format. +//! It implements CSVLogStorage to provide persistent log storage in CSV files. The FileLogStorage can operate in +//! normalized (separate files for entries and contexts) or denormalized mode (single file) +class FileLogStorage : public CSVLogStorage { +public: + explicit FileLogStorage(DatabaseInstance &db); + ~FileLogStorage() override; + + const string GetStorageName() override { + return "FileLogStorage"; + } + + /// Implement LogStorage interface + + //! Truncates the csv files void Truncate() override; + //! Bind replace function to scan the different tables + unique_ptr BindReplace(ClientContext &context, TableFunctionBindInput &input, + LoggingTargetTable table) override; + +protected: + /// Implement CSVLogStorage interface + + //! Handles the config related to the FileLogStorage + void UpdateConfigInternal(DatabaseInstance &db, case_insensitive_map_t &config) override; + //! Lazily initializes the CSV files before first flush + void BeforeFlush(LoggingTargetTable table, DataChunk &chunk) override; + //! Calls `Sync` on the FileWriters to ensure a LogStorage Flush is flushed to disk immediately + void AfterFlush(LoggingTargetTable table, DataChunk &chunk) override; + +private: + //! Intialize the csv file for `table` + void InitializeFile(DatabaseInstance &db, LoggingTargetTable table); + //! Initialize the filewriter to be passed to the CSVWriter + static unique_ptr InitializeFileWriter(DatabaseInstance &db, const string &path); + //! Ensures the table is initialized, used in lazy initialization. If already initialized this will NOP + void Initialize(LoggingTargetTable table); + //! Internal helper function to handle the BindReplace generation + unique_ptr BindReplaceInternal(ClientContext &context, TableFunctionBindInput &input, const string &path, + const string &select_clause, const string &csv_columns); + + //! DB reference to get the DB filesystem + DatabaseInstance &db; + + //! Writer for a table + struct TableWriter { + //! Passed as WriteStreams to the CSVWriter in the base class + unique_ptr file_writer; + //! Path to initialize the file_writer from + string path; + //! Whether the file_writer has been (lazily) initialized + bool initialized = false; + }; + + //! The table info per table + unordered_map tables; + + //! Base path to generate the file paths from + string base_path; + +private: + void SetPaths(const string &base_path); }; +//! State for scanning the in memory buffers class InMemoryLogStorageScanState : public LogStorageScanState { public: - InMemoryLogStorageScanState(); + explicit InMemoryLogStorageScanState(LoggingTargetTable table); ~InMemoryLogStorageScanState() override; ColumnDataScanState scan_state; }; -class InMemoryLogStorage : public LogStorage { +//! The InMemoryLogStorage implements a log storage that is backed by ColumnDataCollection's. It only support normalized +//! mode and support a basic single-threaded scan. TODO: improve? +class InMemoryLogStorage : public BufferingLogStorage { public: explicit InMemoryLogStorage(DatabaseInstance &db); ~InMemoryLogStorage() override; - //! LogStorage API: WRITING - void WriteLogEntry(timestamp_t timestamp, LogLevel level, const string &log_type, const string &log_message, - const RegisteredLoggingContext &context) override; - void WriteLogEntries(DataChunk &chunk, const RegisteredLoggingContext &context) override; - void Flush() override; - - void Truncate() override; - - //! LogStorage API: READING - bool CanScan() override; - - unique_ptr CreateScanEntriesState() const override; - bool ScanEntries(LogStorageScanState &state, DataChunk &result) const override; - void InitializeScanEntries(LogStorageScanState &state) const override; - unique_ptr CreateScanContextsState() const override; - bool ScanContexts(LogStorageScanState &state, DataChunk &result) const override; - void InitializeScanContexts(LogStorageScanState &state) const override; + const string GetStorageName() override { + return "InMemoryLogStorage"; + } -protected: - void WriteLoggingContext(const RegisteredLoggingContext &context); - void ResetBuffers(); + //! Implement LogStorage Single-threaded scan interface + bool CanScan(LoggingTargetTable table) override; + unique_ptr CreateScanState(LoggingTargetTable table) const override; + bool Scan(LogStorageScanState &state, DataChunk &result) const override; + void InitializeScan(LogStorageScanState &state) const override; protected: - mutable mutex lock; + /// Implement BufferingLogStorage interface - void FlushInternal(); + //! Flushes a chunk to the corresponding ColumnDataCollection + void FlushChunk(LoggingTargetTable table, DataChunk &chunk) override; + //! Resets the all ColumnDataCollection's + void ResetAllBuffers() override; - //! Internal log entry storage - unique_ptr log_entries; - unique_ptr log_contexts; - - unordered_set registered_contexts; +private: + //! Helper function to get the buffer + ColumnDataCollection &GetBuffer(LoggingTargetTable table) const; - // Cache for direct logging - unique_ptr entry_buffer; - unique_ptr log_context_buffer; - idx_t max_buffer_size; + unordered_map> log_storage_buffers; }; } // namespace duckdb diff --git a/src/duckdb/src/include/duckdb/main/settings.hpp b/src/duckdb/src/include/duckdb/main/settings.hpp index df38775ad..a8b1879ea 100644 --- a/src/duckdb/src/include/duckdb/main/settings.hpp +++ b/src/duckdb/src/include/duckdb/main/settings.hpp @@ -860,7 +860,7 @@ struct LoggingLevel { struct LoggingMode { using RETURN_TYPE = string; static constexpr const char *Name = "logging_mode"; - static constexpr const char *Description = "Enables the logger"; + static constexpr const char *Description = "Determines which types of log messages are logged"; static constexpr const char *InputType = "VARCHAR"; static void SetGlobal(DatabaseInstance *db, DBConfig &config, const Value ¶meter); static void ResetGlobal(DatabaseInstance *db, DBConfig &config); @@ -870,7 +870,7 @@ struct LoggingMode { struct LoggingStorage { using RETURN_TYPE = string; static constexpr const char *Name = "logging_storage"; - static constexpr const char *Description = "Set the logging storage (memory/stdout/file)"; + static constexpr const char *Description = "Set the logging storage (memory/stdout/file/)"; static constexpr const char *InputType = "VARCHAR"; static void SetGlobal(DatabaseInstance *db, DBConfig &config, const Value ¶meter); static void ResetGlobal(DatabaseInstance *db, DBConfig &config); diff --git a/src/duckdb/src/logging/log_manager.cpp b/src/duckdb/src/logging/log_manager.cpp index e923c01c5..5bdf47912 100644 --- a/src/duckdb/src/logging/log_manager.cpp +++ b/src/duckdb/src/logging/log_manager.cpp @@ -5,6 +5,7 @@ #include "duckdb/main/database.hpp" #include "duckdb/main/client_context.hpp" #include "duckdb/main/client_data.hpp" +#include "duckdb/common/local_file_system.hpp" namespace duckdb { @@ -49,7 +50,7 @@ shared_ptr LogManager::GlobalLoggerReference() { void LogManager::Flush() { unique_lock lck(lock); - log_storage->Flush(); + log_storage->FlushAll(); } shared_ptr LogManager::GetLogStorage() { @@ -57,9 +58,9 @@ shared_ptr LogManager::GetLogStorage() { return log_storage; } -bool LogManager::CanScan() { +bool LogManager::CanScan(LoggingTargetTable table) { unique_lock lck(lock); - return log_storage->CanScan(); + return log_storage->CanScan(table); } LogManager::LogManager(DatabaseInstance &db, LogConfig config_p) : config(std::move(config_p)) { @@ -102,6 +103,18 @@ void LogManager::FlushCachedLogEntries(DataChunk &chunk, const RegisteredLogging throw NotImplementedException("FlushCachedLogEntries"); } +void LogManager::SetConfig(DatabaseInstance &db, LogConfig config_p) { + unique_lock lck(lock); + + // We need extra handling for switching storage + SetLogStorageInternal(db, config_p.storage); + + // Apply the remainder of the config + config = std::move(config_p); + + global_logger->UpdateConfig(config); +} + void LogManager::SetEnableLogging(bool enable) { unique_lock lck(lock); config.enabled = enable; @@ -120,35 +133,54 @@ void LogManager::SetLogLevel(LogLevel level) { global_logger->UpdateConfig(config); } -void LogManager::SetEnabledLogTypes(unordered_set &enabled_log_types) { +void LogManager::SetEnabledLogTypes(optional_ptr> enabled_log_types) { unique_lock lck(lock); - config.enabled_log_types = enabled_log_types; + if (enabled_log_types) { + config.enabled_log_types = *enabled_log_types; + } else { + config.enabled_log_types = {}; + } global_logger->UpdateConfig(config); } -void LogManager::SetDisabledLogTypes(unordered_set &disabled_log_types) { +void LogManager::SetDisabledLogTypes(optional_ptr> disabled_log_types) { unique_lock lck(lock); - config.disabled_log_types = disabled_log_types; + if (disabled_log_types) { + config.disabled_log_types = *disabled_log_types; + } else { + config.disabled_log_types = {}; + } global_logger->UpdateConfig(config); } void LogManager::SetLogStorage(DatabaseInstance &db, const string &storage_name) { unique_lock lck(lock); + SetLogStorageInternal(db, storage_name); +} + +void LogManager::SetLogStorageInternal(DatabaseInstance &db, const string &storage_name) { auto storage_name_to_lower = StringUtil::Lower(storage_name); if (config.storage == storage_name_to_lower) { return; } + if (storage_name_to_lower == LogConfig::FILE_STORAGE_NAME) { + auto &fs = FileSystem::GetFileSystem(db); + if (fs.SubSystemIsDisabled(LocalFileSystem().GetName())) { + throw InvalidConfigurationException("Can not enable file logging with the LocalFileSystem disabled"); + } + } + // Flush the old storage, we are going to replace it. - log_storage->Flush(); + log_storage->FlushAll(); if (storage_name_to_lower == LogConfig::IN_MEMORY_STORAGE_NAME) { log_storage = make_shared_ptr(db); } else if (storage_name_to_lower == LogConfig::STDOUT_STORAGE_NAME) { - log_storage = make_shared_ptr(); + log_storage = make_shared_ptr(db); } else if (storage_name_to_lower == LogConfig::FILE_STORAGE_NAME) { - throw NotImplementedException("File log storage is not yet implemented"); + log_storage = make_shared_ptr(db); } else if (registered_log_storages.find(storage_name_to_lower) != registered_log_storages.end()) { log_storage = registered_log_storages[storage_name_to_lower]; } else { @@ -157,6 +189,11 @@ void LogManager::SetLogStorage(DatabaseInstance &db, const string &storage_name) config.storage = storage_name_to_lower; } +void LogManager::UpdateLogStorageConfig(DatabaseInstance &db, case_insensitive_map_t &config_value) { + unique_lock lck(lock); + log_storage->UpdateConfig(db, config_value); +} + void LogManager::SetEnableStructuredLoggers(vector &enabled_logger_types) { unique_lock lck(lock); config.enabled_log_types.clear(); diff --git a/src/duckdb/src/logging/log_storage.cpp b/src/duckdb/src/logging/log_storage.cpp index 625757490..3c6845f64 100644 --- a/src/duckdb/src/logging/log_storage.cpp +++ b/src/duckdb/src/logging/log_storage.cpp @@ -1,239 +1,775 @@ #include "duckdb/logging/log_storage.hpp" + +#include "duckdb/common/csv_writer.hpp" +#include "duckdb/common/local_file_system.hpp" +#include "duckdb/function/table/read_csv.hpp" +#include "duckdb/common/serializer/memory_stream.hpp" +#include "duckdb/main/database_file_opener.hpp" #include "duckdb/logging/logging.hpp" #include "duckdb/main/database.hpp" #include "duckdb/main/client_context.hpp" - +#include "duckdb/parser/parser.hpp" +#include "duckdb/parser/tableref.hpp" +#include "duckdb/parser/tableref/subqueryref.hpp" +#include "duckdb/function/cast/vector_cast_helpers.hpp" +#include "duckdb/common/operator/string_cast.hpp" +#include "duckdb/execution/operator/csv_scanner/sniffer/csv_sniffer.hpp" + +#include #include namespace duckdb { -unique_ptr LogStorage::CreateScanEntriesState() const { +vector LogStorage::GetSchema(LoggingTargetTable table) { + switch (table) { + case LoggingTargetTable::ALL_LOGS: + return { + LogicalType::UBIGINT, // context_id + LogicalType::VARCHAR, // scope + LogicalType::UBIGINT, // connection_id + LogicalType::UBIGINT, // transaction_id + LogicalType::UBIGINT, // query_id + LogicalType::UBIGINT, // thread + LogicalType::TIMESTAMP, // timestamp + LogicalType::VARCHAR, // log_type + LogicalType::VARCHAR, // level + LogicalType::VARCHAR, // message + }; + case LoggingTargetTable::LOG_ENTRIES: + return { + LogicalType::UBIGINT, // context_id + LogicalType::TIMESTAMP, // timestamp + LogicalType::VARCHAR, // log_type + LogicalType::VARCHAR, // level + LogicalType::VARCHAR, // message + }; + case LoggingTargetTable::LOG_CONTEXTS: + return { + LogicalType::UBIGINT, // context_id + LogicalType::VARCHAR, // scope + LogicalType::UBIGINT, // connection_id + LogicalType::UBIGINT, // transaction_id + LogicalType::UBIGINT, // query_id + LogicalType::UBIGINT, // thread + }; + default: + throw NotImplementedException("Unknown logging target table"); + } +} + +vector LogStorage::GetColumnNames(LoggingTargetTable table) { + switch (table) { + case LoggingTargetTable::ALL_LOGS: + return { + "context_id", "scope", "connection_id", "transaction_id", "query_id", + "thread_id", "timestamp", "type", "log_level", "message", + }; + case LoggingTargetTable::LOG_ENTRIES: + return {"context_id", "timestamp", "type", "log_level", "message"}; + case LoggingTargetTable::LOG_CONTEXTS: + return { + "context_id", "scope", "connection_id", "transaction_id", "query_id", "thread_id", + }; + default: + throw NotImplementedException("Unknown logging target table"); + } +} + +unique_ptr LogStorage::CreateScanState(LoggingTargetTable table) const { throw NotImplementedException("Not implemented for this LogStorage: CreateScanEntriesState"); } -bool LogStorage::ScanEntries(LogStorageScanState &state, DataChunk &result) const { +bool LogStorage::Scan(LogStorageScanState &state, DataChunk &result) const { throw NotImplementedException("Not implemented for this LogStorage: ScanEntries"); } -void LogStorage::InitializeScanEntries(LogStorageScanState &state) const { +void LogStorage::InitializeScan(LogStorageScanState &state) const { throw NotImplementedException("Not implemented for this LogStorage: InitializeScanEntries"); } -unique_ptr LogStorage::CreateScanContextsState() const { - throw NotImplementedException("Not implemented for this LogStorage: CreateScanContextsState"); +void LogStorage::Truncate() { + throw NotImplementedException("Not implemented for this LogStorage: TruncateLogStorage"); +} + +void LogStorage::UpdateConfig(DatabaseInstance &db, case_insensitive_map_t &config) { + if (config.size() > 1) { + throw InvalidInputException("LogStorage does not support passing configuration"); + } +} + +unique_ptr LogStorage::BindReplace(ClientContext &context, TableFunctionBindInput &input, + LoggingTargetTable table) { + return nullptr; +} + +CSVLogStorage::~CSVLogStorage() { } -bool LogStorage::ScanContexts(LogStorageScanState &state, DataChunk &result) const { - throw NotImplementedException("Not implemented for this LogStorage: ScanContexts"); + +CSVLogStorage::CSVLogStorage(DatabaseInstance &db, bool normalize, idx_t buffer_size) + : BufferingLogStorage(db, buffer_size, normalize) { + reader_options = make_uniq(); + writer_options = make_uniq(*reader_options); + + reader_options->dialect_options.state_machine_options.escape = '\"'; + reader_options->dialect_options.state_machine_options.quote = '\"'; + reader_options->dialect_options.state_machine_options.delimiter = CSVOption("\t"); + + ResetCastChunk(); } -void LogStorage::InitializeScanContexts(LogStorageScanState &state) const { - throw NotImplementedException("Not implemented for this LogStorage: InitializeScanContexts"); + +void BufferingLogStorage::UpdateConfig(DatabaseInstance &db, case_insensitive_map_t &config) { + lock_guard lck(lock); + return UpdateConfigInternal(db, config); } -void LogStorage::Truncate() { - throw NotImplementedException("Not implemented for this LogStorage: TruncateLogStorage"); + +bool BufferingLogStorage::IsEnabled(LoggingTargetTable table) { + lock_guard lck(lock); + return IsEnabledInternal(table); +} + +bool BufferingLogStorage::IsEnabledInternal(LoggingTargetTable table) { + if (normalize_contexts) { + return table == LoggingTargetTable::LOG_CONTEXTS || table == LoggingTargetTable::LOG_ENTRIES; + } + + return table == LoggingTargetTable::ALL_LOGS; +} + +void CSVLogStorage::ExecuteCast(LoggingTargetTable table, DataChunk &chunk) { + // Reset the cast buffer before use + cast_buffers[table]->Reset(); + + auto &cast_buffer = *cast_buffers[table]; + idx_t count = chunk.size(); + + // Do default casts + for (idx_t i = 0; i < chunk.data.size(); i++) { + VectorOperations::DefaultCast(chunk.data[i], cast_buffer.data[i], count, false); + } + cast_buffer.SetCardinality(count); +} + +void CSVLogStorage::ResetAllBuffers() { + BufferingLogStorage::ResetAllBuffers(); + ResetCastChunk(); +} + +void CSVLogStorage::UpdateConfigInternal(DatabaseInstance &db, case_insensitive_map_t &config) { + auto config_copy = config; + + bool changed_writer_settings = false; + + vector to_remove; + for (const auto &it : config_copy) { + auto key = StringUtil::Lower(it.first); + if (key == "delim") { + changed_writer_settings = true; + reader_options->dialect_options.state_machine_options.delimiter = CSVOption(it.second.ToString()); + to_remove.push_back(it.first); + } + } + + if (changed_writer_settings) { + for (auto &writer : writers) { + SetWriterConfigs(*writer.second, GetColumnNames(writer.first)); + } + } + + for (const auto &it : to_remove) { + config_copy.erase(it); + } + + return BufferingLogStorage::UpdateConfigInternal(db, config_copy); +} + +void CSVLogStorage::RegisterWriter(LoggingTargetTable table, unique_ptr writer) { + writers[table] = std::move(writer); +} + +CSVWriter &CSVLogStorage::GetWriter(LoggingTargetTable table) { + return *writers[table]; +} + +void CSVLogStorage::InitializeCastChunk(LoggingTargetTable table) { + cast_buffers[table] = make_uniq(); + + vector types; + types.resize(GetSchema(table).size(), LogicalType::VARCHAR); + + cast_buffers[table]->Initialize(Allocator::DefaultAllocator(), types); +} +void CSVLogStorage::ResetCastChunk() { + InitializeCastChunk(LoggingTargetTable::LOG_ENTRIES); + InitializeCastChunk(LoggingTargetTable::LOG_CONTEXTS); + InitializeCastChunk(LoggingTargetTable::ALL_LOGS); +} + +void CSVLogStorage::SetWriterConfigs(CSVWriter &writer, vector column_names) { + writer.options = *reader_options; + writer.writer_options = *writer_options; + + // Update the config with the column names since that is different per schema + writer.options.name_list = std::move(column_names); + writer.options.force_quote = vector(writer.options.name_list.size(), false); +} + +CSVReaderOptions &CSVLogStorage::GetCSVReaderOptions() { + return *reader_options; +} + +CSVWriterOptions &CSVLogStorage::GetCSVWriterOptions() { + return *writer_options; +} + +void CSVLogStorage::FlushChunk(LoggingTargetTable table, DataChunk &chunk) { + BeforeFlush(table, chunk); + + // Execute the cast + ExecuteCast(table, chunk); + + // Write the chunk to the CSVWriter + writers[table]->WriteChunk(*cast_buffers[table]); + writers[table]->Flush(); + + // Call child class to implement any post flushing behaviour (e.g. calling sync) + AfterFlush(table, chunk); + + // Reset the cast buffer + cast_buffers[table]->Reset(); +} + +void BufferingLogStorage::UpdateConfigInternal(DatabaseInstance &db, case_insensitive_map_t &config) { + for (const auto &it : config) { + if (StringUtil::Lower(it.first) == "buffer_size") { + buffer_limit = it.second.GetValue(); + ResetAllBuffers(); + } else if (StringUtil::Lower(it.first) == "only_flush_on_full_buffer") { + // This is a debug option used during testing. It disables the manual + only_flush_on_full_buffer = it.second.GetValue(); + } else if (StringUtil::Lower(it.first) == "normalize") { + throw InternalException("'normalize' setting should be handled in child class"); + } else { + throw InvalidInputException("Unrecognized log storage config option for storage: '%s': '%s'", + GetStorageName(), it.first); + } + } +} + +void StdOutLogStorage::StdOutWriteStream::WriteData(const_data_ptr_t buffer, idx_t write_size) { + std::cout.write(const_char_ptr_cast(buffer), NumericCast(write_size)); + std::cout.flush(); } -StdOutLogStorage::StdOutLogStorage() { +StdOutLogStorage::StdOutLogStorage(DatabaseInstance &db) : CSVLogStorage(db, false, 1) { + // StdOutLogStorage is denormalized only + auto target_table = LoggingTargetTable::ALL_LOGS; + + // Set storage specific defaults + GetCSVWriterOptions().newline_writing_mode = CSVNewLineMode::WRITE_AFTER; + GetCSVReaderOptions().dialect_options.state_machine_options.delimiter = CSVOption("\t"); + + // Create and configure writer + auto writer = make_uniq(stdout_stream, GetColumnNames(target_table), false); + SetWriterConfigs(*writer, GetColumnNames(target_table)); + + RegisterWriter(target_table, std::move(writer)); } StdOutLogStorage::~StdOutLogStorage() { } -void StdOutLogStorage::WriteLogEntry(timestamp_t timestamp, LogLevel level, const string &log_type, - const string &log_message, const RegisteredLoggingContext &context) { - std::cout << StringUtil::Format( - "[LOG] %s, %s, %s, %s, %s, %s, %s, %s\n", Value::TIMESTAMP(timestamp).ToString(), log_type, - EnumUtil::ToString(level), log_message, EnumUtil::ToString(context.context.scope), - context.context.connection_id.IsValid() ? to_string(context.context.connection_id.GetIndex()) : "NULL", - context.context.transaction_id.IsValid() ? to_string(context.context.transaction_id.GetIndex()) : "NULL", - context.context.thread_id.IsValid() ? to_string(context.context.thread_id.GetIndex()) : "NULL"); +FileLogStorage::FileLogStorage(DatabaseInstance &db_p) : CSVLogStorage(db_p, true, STANDARD_VECTOR_SIZE), db(db_p) { + tables[LoggingTargetTable::ALL_LOGS] = TableWriter(); + tables[LoggingTargetTable::LOG_CONTEXTS] = TableWriter(); + tables[LoggingTargetTable::LOG_ENTRIES] = TableWriter(); + + // Set storage specific defaults + GetCSVWriterOptions().newline_writing_mode = CSVNewLineMode::WRITE_BEFORE; + GetCSVReaderOptions().dialect_options.state_machine_options.delimiter = CSVOption(","); +} + +FileLogStorage::~FileLogStorage() { } -void StdOutLogStorage::WriteLogEntries(DataChunk &chunk, const RegisteredLoggingContext &context) { - throw NotImplementedException("StdOutLogStorage::WriteLogEntries"); +void FileLogStorage::InitializeFile(DatabaseInstance &db, LoggingTargetTable table) { + auto &table_writer = tables[table]; + + // reset the files writer, we may be re-initializing it here and otherwise we hold 2 handles to the same file + table_writer.file_writer.reset(); + + // (re)initialize the file writer + table_writer.file_writer = InitializeFileWriter(db, table_writer.path); + auto file_writer = table_writer.file_writer.get(); + + // Create CSV writer that writes to file + auto column_names = GetColumnNames(table); + + // Configure writer + auto csv_writer = make_uniq(*file_writer, column_names, false); + SetWriterConfigs(*csv_writer, column_names); + bool should_write_header = file_writer->handle->GetFileSize() == 0; + + // We write the header only if the file was empty: when appending to the file we don't + csv_writer->options.dialect_options.header = {should_write_header, true}; + + // Initialize the writer, this writes out the header if required + csv_writer->Initialize(); + + // Needed to ensure we correctly start with a newline + csv_writer->SetWrittenAnything(true); + + RegisterWriter(table, std::move(csv_writer)); + + // Ensures that the file is fully initialized when this function returns + file_writer->Sync(); + + table_writer.initialized = true; } -void StdOutLogStorage::Truncate() { - // NOP +unique_ptr FileLogStorage::InitializeFileWriter(DatabaseInstance &db, const string &path) { + auto &fs = db.GetFileSystem(); + + // Create parent directories if non existent + auto pos = path.find_last_of(fs.PathSeparator(path)); + if (pos != path.npos) { + fs.CreateDirectoriesRecursive(path.substr(0, pos)); + } + + FileOpenFlags flags; + if (!fs.FileExists(path)) { + flags = FileFlags::FILE_FLAGS_DISABLE_LOGGING | FileFlags::FILE_FLAGS_WRITE | + FileFlags::FILE_FLAGS_FILE_CREATE_NEW | FileCompressionType::UNCOMPRESSED; + } else { + flags = FileFlags::FILE_FLAGS_DISABLE_LOGGING | FileFlags::FILE_FLAGS_WRITE | FileFlags::FILE_FLAGS_APPEND; + } + + return make_uniq(fs, path, flags); } -void StdOutLogStorage::Flush() { - // NOP +void FileLogStorage::Truncate() { + lock_guard lck(lock); + + // Reset buffers + ResetAllBuffers(); + + for (const auto &it : tables) { + auto &file_writer = it.second.file_writer; + if (!file_writer) { + continue; + } + // Truncate the file writer + file_writer->Truncate(0); + // Re-initialize the corresponding CSVWriter + GetWriter(it.first).Initialize(true); + } } -InMemoryLogStorageScanState::InMemoryLogStorageScanState() { +void FileLogStorage::BeforeFlush(LoggingTargetTable table, DataChunk &) { + // Lazily initialize the files + Initialize(table); } -InMemoryLogStorageScanState::~InMemoryLogStorageScanState() { + +void FileLogStorage::AfterFlush(LoggingTargetTable table, DataChunk &) { + tables[table].file_writer->Sync(); } -InMemoryLogStorage::InMemoryLogStorage(DatabaseInstance &db_p) - : entry_buffer(make_uniq()), log_context_buffer(make_uniq()) { - // LogEntry Schema - vector log_entry_schema = { - LogicalType::UBIGINT, // context_id - LogicalType::TIMESTAMP, // timestamp - LogicalType::VARCHAR, // log_type TODO: const vector where possible? - LogicalType::VARCHAR, // level TODO: enumify - LogicalType::VARCHAR, // message - }; +void FileLogStorage::Initialize(LoggingTargetTable table) { + auto &table_writer = tables[table]; + if (!table_writer.initialized) { + if (table_writer.path.empty()) { + throw InvalidConfigurationException("Failed to initialize file log storage table, path wasn't set"); + } + + InitializeFile(db, table); + } +} - // LogContext Schema - vector log_context_schema = { - LogicalType::UBIGINT, // context_id - LogicalType::VARCHAR, // scope TODO: enumify - LogicalType::UBIGINT, // connection_id - LogicalType::UBIGINT, // transaction_id - LogicalType::UBIGINT, // query_id - LogicalType::UBIGINT, // thread - }; +void FileLogStorage::SetPaths(const string &base_path) { + for (auto &it : tables) { + it.second.path.clear(); + } - max_buffer_size = STANDARD_VECTOR_SIZE; - entry_buffer->Initialize(Allocator::DefaultAllocator(), log_entry_schema, max_buffer_size); - log_context_buffer->Initialize(Allocator::DefaultAllocator(), log_context_schema, max_buffer_size); - log_entries = make_uniq(db_p.GetBufferManager(), log_entry_schema); - log_contexts = make_uniq(db_p.GetBufferManager(), log_context_schema); + LocalFileSystem fs; + if (normalize_contexts) { + tables[LoggingTargetTable::LOG_CONTEXTS].path = fs.JoinPath(base_path, "duckdb_log_contexts.csv"); + tables[LoggingTargetTable::LOG_ENTRIES].path = fs.JoinPath(base_path, "duckdb_log_entries.csv"); + } else { + if (StringUtil::EndsWith(base_path, ".csv")) { + tables[LoggingTargetTable::ALL_LOGS].path = base_path; + } else { + tables[LoggingTargetTable::LOG_ENTRIES].path = fs.JoinPath(base_path, "duckdb_log_entries.csv"); + } + } } -void InMemoryLogStorage::ResetBuffers() { - entry_buffer->Reset(); - log_context_buffer->Reset(); +void FileLogStorage::UpdateConfigInternal(DatabaseInstance &db, case_insensitive_map_t &config) { + auto config_copy = config; + + string new_path; + bool normalize_contexts_new_value = normalize_contexts; + bool normalize_set_explicitly = false; + bool require_reinitializing_files = false; + + vector to_remove; + for (const auto &it : config_copy) { + auto key = StringUtil::Lower(it.first); + if (key == "path") { + auto path_value = it.second.ToString(); + //! We implicitly set normalize to false when a path ending in .csv is specified + if (!normalize_set_explicitly && StringUtil::EndsWith(path_value, ".csv")) { + normalize_contexts_new_value = false; + } + new_path = path_value; + to_remove.push_back(it.first); + } else if (key == "normalize") { + normalize_set_explicitly = true; + normalize_contexts_new_value = it.second.GetValue(); + to_remove.push_back(it.first); + } else if (key == "delim") { + require_reinitializing_files = true; + } + } + + if (StringUtil::EndsWith(new_path, ".csv") && normalize_contexts_new_value) { + throw InvalidConfigurationException( + "Can not set path to '%s' while normalize is true. Normalize will make DuckDB write multiple log files to " + "more efficiently store log entries. Please specify a directory path instead of a csv file path, or set " + "normalize to false.", + new_path); + } + + // If any writer is initialized, we flush first: + // - when switching between normalized and denormalized, this is necessary since we are changing the buffer schema + // - when simply changing the path, it avoids writing log entries written before this change to end up in the new + // file + bool initialized = false; + for (auto &it : tables) { + initialized |= it.second.initialized; + } + if (initialized) { + FlushAllInternal(); + } + + require_reinitializing_files |= normalize_contexts != normalize_contexts_new_value; + normalize_contexts = normalize_contexts_new_value; + + // Reset the buffers to ensure they have the correct schema + if (require_reinitializing_files) { + ResetAllBuffers(); + + // Mark tables as uninitialized + for (auto &table : tables) { + table.second.initialized = false; + } + } + + // Apply any path change + if (new_path != base_path) { + base_path = new_path; + SetPaths(new_path); + } - log_entries->Reset(); - log_contexts->Reset(); + for (const auto &it : to_remove) { + config_copy.erase(it); + } - registered_contexts.clear(); + CSVLogStorage::UpdateConfigInternal(db, config_copy); } -InMemoryLogStorage::~InMemoryLogStorage() { +unique_ptr FileLogStorage::BindReplaceInternal(ClientContext &context, TableFunctionBindInput &input, + const string &path, const string &select_clause, + const string &csv_columns) { + string sub_query_string; + + string escaped_path = KeywordHelper::WriteOptionallyQuoted(path); + sub_query_string = + StringUtil::Format("%s FROM read_csv_auto(%s, columns={%s})", select_clause, escaped_path, csv_columns); + + Parser parser(context.GetParserOptions()); + parser.ParseQuery(sub_query_string); + auto select_stmt = unique_ptr_cast(std::move(parser.statements[0])); + + return duckdb::make_uniq(std::move(select_stmt)); } -void InMemoryLogStorage::WriteLogEntry(timestamp_t timestamp, LogLevel level, const string &log_type, - const string &log_message, const RegisteredLoggingContext &context) { - unique_lock lck(lock); +unique_ptr FileLogStorage::BindReplace(ClientContext &context, TableFunctionBindInput &input, + LoggingTargetTable table) { + lock_guard lck(lock); - if (registered_contexts.find(context.context_id) == registered_contexts.end()) { - WriteLoggingContext(context); + // We only allow scanning enabled tables + if (!IsEnabledInternal(table)) { + return nullptr; } - auto size = entry_buffer->size(); - auto context_id_data = FlatVector::GetData(entry_buffer->data[0]); - auto timestamp_data = FlatVector::GetData(entry_buffer->data[1]); - auto type_data = FlatVector::GetData(entry_buffer->data[2]); - auto level_data = FlatVector::GetData(entry_buffer->data[3]); - auto message_data = FlatVector::GetData(entry_buffer->data[4]); + // We start by flushing the table to ensure we scan the latest version + FlushInternal(table); - context_id_data[size] = context.context_id; - timestamp_data[size] = timestamp; - type_data[size] = StringVector::AddString(entry_buffer->data[2], log_type); - level_data[size] = StringVector::AddString(entry_buffer->data[3], EnumUtil::ToString(level)); - message_data[size] = StringVector::AddString(entry_buffer->data[4], log_message); + if (normalize_contexts && table == LoggingTargetTable::ALL_LOGS) { + throw InvalidConfigurationException("Can not scan ALL_LOGS table when logs are normalized"); + } + if (!normalize_contexts && table != LoggingTargetTable::ALL_LOGS) { + throw InvalidConfigurationException("Can only scan ALL_LOGS table when logs are normalized"); + } + + string select = "SELECT *"; + string path = tables[table].path; + + string columns; + if (table == LoggingTargetTable::LOG_ENTRIES) { + columns = "'context_id': 'UBIGINT', 'timestamp': 'TIMESTAMP', 'type': 'VARCHAR', 'log_level': 'VARCHAR' , " + "'message': 'VARCHAR'"; + } else if (table == LoggingTargetTable::LOG_CONTEXTS) { + columns = "'context_id': 'UBIGINT', 'scope': 'VARCHAR', 'connection_id': 'UBIGINT', 'transaction_id': " + "'UBIGINT', 'query_id': 'UBIGINT', 'thread_id': 'UBIGINT'"; + } else if (table == LoggingTargetTable::ALL_LOGS) { + select = "SELECT context_id, scope, connection_id, transaction_id, query_id, thread_id, timestamp, type, " + "log_level, message "; + columns = "'context_id': 'UBIGINT', 'scope': 'VARCHAR', 'connection_id': 'UBIGINT', 'transaction_id': " + "'UBIGINT', 'query_id': 'UBIGINT', 'thread_id': 'UBIGINT', 'timestamp': 'TIMESTAMP', 'type': " + "'VARCHAR', 'log_level': 'VARCHAR' , 'message': 'VARCHAR'"; + } else { + throw InternalException("Invalid logging target table"); + } + + return BindReplaceInternal(context, input, path, select, columns); +} - entry_buffer->SetCardinality(size + 1); +BufferingLogStorage::BufferingLogStorage(DatabaseInstance &db_p, idx_t buffer_size, bool normalize) + : normalize_contexts(normalize), buffer_limit(buffer_size) { + ResetLogBuffers(); +} + +void BufferingLogStorage::ResetLogBuffers() { + if (normalize_contexts) { + buffers[LoggingTargetTable::LOG_ENTRIES] = make_uniq(); + buffers[LoggingTargetTable::LOG_CONTEXTS] = make_uniq(); + buffers[LoggingTargetTable::LOG_ENTRIES]->Initialize(Allocator::DefaultAllocator(), + GetSchema(LoggingTargetTable::LOG_ENTRIES), buffer_limit); + buffers[LoggingTargetTable::LOG_CONTEXTS]->Initialize( + Allocator::DefaultAllocator(), GetSchema(LoggingTargetTable::LOG_CONTEXTS), buffer_limit); - if (size + 1 >= max_buffer_size) { - FlushInternal(); + } else { + buffers[LoggingTargetTable::ALL_LOGS] = make_uniq(); + buffers[LoggingTargetTable::ALL_LOGS]->Initialize(Allocator::DefaultAllocator(), + GetSchema(LoggingTargetTable::ALL_LOGS), buffer_limit); } } -void InMemoryLogStorage::WriteLogEntries(DataChunk &chunk, const RegisteredLoggingContext &context) { - log_entries->Append(chunk); +void BufferingLogStorage::ResetAllBuffers() { + ResetLogBuffers(); } -void InMemoryLogStorage::Flush() { - unique_lock lck(lock); - FlushInternal(); +InMemoryLogStorageScanState::InMemoryLogStorageScanState(LoggingTargetTable table) : LogStorageScanState(table) { +} +InMemoryLogStorageScanState::~InMemoryLogStorageScanState() { } -void InMemoryLogStorage::Truncate() { - unique_lock lck(lock); - ResetBuffers(); +InMemoryLogStorage::InMemoryLogStorage(DatabaseInstance &db_p) : BufferingLogStorage(db_p, STANDARD_VECTOR_SIZE, true) { + log_storage_buffers[LoggingTargetTable::LOG_ENTRIES] = + make_uniq(db_p.GetBufferManager(), GetSchema(LoggingTargetTable::LOG_ENTRIES)); + log_storage_buffers[LoggingTargetTable::LOG_CONTEXTS] = + make_uniq(db_p.GetBufferManager(), GetSchema(LoggingTargetTable::LOG_CONTEXTS)); } -void InMemoryLogStorage::FlushInternal() { - if (entry_buffer->size() > 0) { - log_entries->Append(*entry_buffer); - entry_buffer->Reset(); +void InMemoryLogStorage::ResetAllBuffers() { + BufferingLogStorage::ResetAllBuffers(); + + for (const auto &buffer : log_storage_buffers) { + buffer.second->Reset(); } +} - if (log_context_buffer->size() > 0) { - log_contexts->Append(*log_context_buffer); - log_context_buffer->Reset(); +ColumnDataCollection &InMemoryLogStorage::GetBuffer(LoggingTargetTable table) const { + auto res = log_storage_buffers.find(table); + if (res == log_storage_buffers.end()) { + throw InternalException("Failed to find table"); } + return *res->second; } -void InMemoryLogStorage::WriteLoggingContext(const RegisteredLoggingContext &context) { - registered_contexts.insert(context.context_id); +InMemoryLogStorage::~InMemoryLogStorage() { +} - auto size = log_context_buffer->size(); +BufferingLogStorage::~BufferingLogStorage() { +} + +static void WriteLoggingContextsToChunk(DataChunk &chunk, const RegisteredLoggingContext &context, idx_t &col) { + + auto size = chunk.size(); - auto context_id_data = FlatVector::GetData(log_context_buffer->data[0]); + auto context_id_data = FlatVector::GetData(chunk.data[col++]); context_id_data[size] = context.context_id; - auto context_scope_data = FlatVector::GetData(log_context_buffer->data[1]); - context_scope_data[size] = - StringVector::AddString(log_context_buffer->data[1], EnumUtil::ToString(context.context.scope)); + auto context_scope_data = FlatVector::GetData(chunk.data[col]); + context_scope_data[size] = StringVector::AddString(chunk.data[col++], EnumUtil::ToString(context.context.scope)); if (context.context.connection_id.IsValid()) { - auto client_context_data = FlatVector::GetData(log_context_buffer->data[2]); + auto client_context_data = FlatVector::GetData(chunk.data[col++]); client_context_data[size] = context.context.connection_id.GetIndex(); } else { - FlatVector::Validity(log_context_buffer->data[2]).SetInvalid(size); + FlatVector::Validity(chunk.data[col++]).SetInvalid(size); } if (context.context.transaction_id.IsValid()) { - auto client_context_data = FlatVector::GetData(log_context_buffer->data[3]); + auto client_context_data = FlatVector::GetData(chunk.data[col++]); client_context_data[size] = context.context.transaction_id.GetIndex(); } else { - FlatVector::Validity(log_context_buffer->data[3]).SetInvalid(size); + FlatVector::Validity(chunk.data[col++]).SetInvalid(size); } if (context.context.query_id.IsValid()) { - auto client_context_data = FlatVector::GetData(log_context_buffer->data[4]); + auto client_context_data = FlatVector::GetData(chunk.data[col++]); client_context_data[size] = context.context.query_id.GetIndex(); } else { - FlatVector::Validity(log_context_buffer->data[4]).SetInvalid(size); + FlatVector::Validity(chunk.data[col++]).SetInvalid(size); } if (context.context.thread_id.IsValid()) { - auto thread_data = FlatVector::GetData(log_context_buffer->data[5]); + auto thread_data = FlatVector::GetData(chunk.data[col++]); thread_data[size] = context.context.thread_id.GetIndex(); } else { - FlatVector::Validity(log_context_buffer->data[5]).SetInvalid(size); + FlatVector::Validity(chunk.data[col++]).SetInvalid(size); + } + + chunk.SetCardinality(size + 1); +} + +void BufferingLogStorage::WriteLogEntry(timestamp_t timestamp, LogLevel level, const string &log_type, + const string &log_message, const RegisteredLoggingContext &context) { + unique_lock lck(lock); + + auto &log_entries_buffer = + normalize_contexts ? buffers[LoggingTargetTable::LOG_ENTRIES] : buffers[LoggingTargetTable::ALL_LOGS]; + + auto size = log_entries_buffer->size(); + if (size >= buffer_limit) { + throw InternalException("Log buffer limit exceeded: code should have flushed before"); + } + + if (registered_contexts.find(context.context_id) == registered_contexts.end()) { + WriteLoggingContext(context); + // New context_id: we should flush both contexts and entries next time LOG_ENTRIES gets flushed + flush_contexts_on_next_entry_flush = true; + } + + idx_t col = 0; + + if (normalize_contexts) { + auto context_id_data = FlatVector::GetData(log_entries_buffer->data[col++]); + context_id_data[size] = context.context_id; + } else { + WriteLoggingContextsToChunk(*log_entries_buffer, context, col); } - log_context_buffer->SetCardinality(size + 1); + auto timestamp_data = FlatVector::GetData(log_entries_buffer->data[col++]); + timestamp_data[size] = timestamp; - if (size + 1 >= max_buffer_size) { - FlushInternal(); + auto type_data = FlatVector::GetData(log_entries_buffer->data[col]); + type_data[size] = StringVector::AddString(log_entries_buffer->data[col++], log_type); + + auto level_data = FlatVector::GetData(log_entries_buffer->data[col]); + level_data[size] = StringVector::AddString(log_entries_buffer->data[col++], + EnumUtil::ToString(level)); // TODO: do cast on write out + + auto message_data = FlatVector::GetData(log_entries_buffer->data[col]); + message_data[size] = StringVector::AddString(log_entries_buffer->data[col++], log_message); + + log_entries_buffer->SetCardinality(size + 1); + + if (size + 1 >= buffer_limit) { + if (normalize_contexts) { + // Flush all entries + FlushInternal(LoggingTargetTable::LOG_ENTRIES); + // Flush contexts if required + if (flush_contexts_on_next_entry_flush) { + FlushInternal(LoggingTargetTable::LOG_CONTEXTS); + flush_contexts_on_next_entry_flush = false; + } + } else { + FlushInternal(LoggingTargetTable::ALL_LOGS); + } } } -bool InMemoryLogStorage::CanScan() { - return true; +void BufferingLogStorage::WriteLogEntries(DataChunk &chunk, const RegisteredLoggingContext &context) { + throw NotImplementedException("BufferingLogStorage::WriteLogEntries(DataChunk &chunk) not implemented"); } -unique_ptr InMemoryLogStorage::CreateScanEntriesState() const { - return make_uniq(); +void BufferingLogStorage::FlushAll() { + unique_lock lck(lock); + if (!only_flush_on_full_buffer) { + FlushAllInternal(); + } } -bool InMemoryLogStorage::ScanEntries(LogStorageScanState &state, DataChunk &result) const { + +void BufferingLogStorage::Flush(LoggingTargetTable table) { unique_lock lck(lock); - auto &in_mem_scan_state = state.Cast(); - return log_entries->Scan(in_mem_scan_state.scan_state, result); + if (!only_flush_on_full_buffer) { + FlushInternal(table); + } } -void InMemoryLogStorage::InitializeScanEntries(LogStorageScanState &state) const { +void BufferingLogStorage::Truncate() { unique_lock lck(lock); - auto &in_mem_scan_state = state.Cast(); - log_entries->InitializeScan(in_mem_scan_state.scan_state, ColumnDataScanProperties::DISALLOW_ZERO_COPY); + ResetAllBuffers(); +} + +void InMemoryLogStorage::FlushChunk(LoggingTargetTable table, DataChunk &chunk) { + D_ASSERT(table == LoggingTargetTable::LOG_ENTRIES || table == LoggingTargetTable::LOG_CONTEXTS); + log_storage_buffers[table]->Append(chunk); +} + +void BufferingLogStorage::FlushAllInternal() { + if (normalize_contexts) { + FlushInternal(LoggingTargetTable::LOG_ENTRIES); + FlushInternal(LoggingTargetTable::LOG_CONTEXTS); + } else { + FlushInternal(LoggingTargetTable::ALL_LOGS); + } +} + +void BufferingLogStorage::FlushInternal(LoggingTargetTable table) { + if (!IsEnabledInternal(table)) { + throw InvalidConfigurationException("Cannot flush disabled logging target"); + } + FlushChunk(table, *buffers[table]); + buffers[table]->Reset(); +} + +void BufferingLogStorage::WriteLoggingContext(const RegisteredLoggingContext &context) { + registered_contexts.insert(context.context_id); + + // If we don't normalize the contexts they are written out on every log entry + if (!normalize_contexts) { + return; + } + + idx_t col = 0; + + auto &log_contexts_buffer = buffers[LoggingTargetTable::LOG_CONTEXTS]; + + if (log_contexts_buffer->size() + 1 > buffer_limit) { + FlushInternal(LoggingTargetTable::LOG_CONTEXTS); + } + + WriteLoggingContextsToChunk(*log_contexts_buffer, context, col); +} + +bool InMemoryLogStorage::CanScan(LoggingTargetTable table) { + unique_lock lck(lock); + return IsEnabledInternal(table); } -unique_ptr InMemoryLogStorage::CreateScanContextsState() const { - return make_uniq(); +unique_ptr InMemoryLogStorage::CreateScanState(LoggingTargetTable table) const { + return make_uniq(table); } -bool InMemoryLogStorage::ScanContexts(LogStorageScanState &state, DataChunk &result) const { + +bool InMemoryLogStorage::Scan(LogStorageScanState &state, DataChunk &result) const { unique_lock lck(lock); auto &in_mem_scan_state = state.Cast(); - return log_contexts->Scan(in_mem_scan_state.scan_state, result); + return GetBuffer(in_mem_scan_state.table).Scan(in_mem_scan_state.scan_state, result); } -void InMemoryLogStorage::InitializeScanContexts(LogStorageScanState &state) const { +void InMemoryLogStorage::InitializeScan(LogStorageScanState &state) const { unique_lock lck(lock); auto &in_mem_scan_state = state.Cast(); - log_contexts->InitializeScan(in_mem_scan_state.scan_state, ColumnDataScanProperties::DISALLOW_ZERO_COPY); + GetBuffer(in_mem_scan_state.table) + .InitializeScan(in_mem_scan_state.scan_state, ColumnDataScanProperties::DISALLOW_ZERO_COPY); } } // namespace duckdb diff --git a/src/duckdb/src/main/http/http_util.cpp b/src/duckdb/src/main/http/http_util.cpp index 022682789..2a470c45a 100644 --- a/src/duckdb/src/main/http/http_util.cpp +++ b/src/duckdb/src/main/http/http_util.cpp @@ -227,7 +227,13 @@ unique_ptr HTTPUtil::SendRequest(BaseRequest &request, unique_ptr< } std::function(void)> on_request([&]() { - auto response = client->Request(request); + unique_ptr response; + try { + response = client->Request(request); + } catch (...) { + LogRequest(request, nullptr); + throw; + } LogRequest(request, response ? response.get() : nullptr); return response; }); diff --git a/src/duckdb/src/parser/transform/statement/transform_pivot_stmt.cpp b/src/duckdb/src/parser/transform/statement/transform_pivot_stmt.cpp index 94bafd72f..07dfb420d 100644 --- a/src/duckdb/src/parser/transform/statement/transform_pivot_stmt.cpp +++ b/src/duckdb/src/parser/transform/statement/transform_pivot_stmt.cpp @@ -125,6 +125,8 @@ unique_ptr Transformer::CreatePivotStatement(unique_ptrstatements.push_back(GenerateCreateEnumStmt(std::move(pivot))); } + result->stmt_location = statement->stmt_location; + result->stmt_length = statement->stmt_length; result->statements.push_back(std::move(statement)); // FIXME: drop the types again!? // for(auto &pivot : pivot_entries) { diff --git a/src/duckdb/src/planner/binder/expression/bind_operator_expression.cpp b/src/duckdb/src/planner/binder/expression/bind_operator_expression.cpp index a42710518..d5baa1c8a 100644 --- a/src/duckdb/src/planner/binder/expression/bind_operator_expression.cpp +++ b/src/duckdb/src/planner/binder/expression/bind_operator_expression.cpp @@ -121,10 +121,18 @@ BindResult ExpressionBinder::BindExpression(OperatorExpression &op, idx_t depth) function_name = "json_extract"; // Make sure we only extract array elements, not fields, by adding the $[] syntax auto &i_exp = BoundExpression::GetExpression(*op.children[1]); - if (i_exp->GetExpressionClass() == ExpressionClass::BOUND_CONSTANT) { + if (i_exp->GetExpressionClass() == ExpressionClass::BOUND_CONSTANT && + !i_exp->Cast().value.IsNull()) { auto &const_exp = i_exp->Cast(); - if (!const_exp.value.IsNull()) { - const_exp.value = StringUtil::Format("$[%s]", const_exp.value.ToString()); + if (const_exp.value.TryCastAs(context, LogicalType::UINTEGER)) { + // Array extraction: if the cast fails it's definitely out-of-bounds for a JSON array + auto index = UIntegerValue::Get(const_exp.value); + index -= index > 0; // Subtract 1 for SQL 1-based indexing (except when accessing from back) + const_exp.value = StringUtil::Format("$[%lld]", index); + const_exp.return_type = LogicalType::VARCHAR; + } else if (const_exp.return_type.id() == LogicalType::VARCHAR) { + // Field extraction + const_exp.value = StringUtil::Format("$.\"%s\"", const_exp.value.ToString()); const_exp.return_type = LogicalType::VARCHAR; } } diff --git a/src/duckdb/src/planner/pragma_handler.cpp b/src/duckdb/src/planner/pragma_handler.cpp index 7e0b8eab7..dda722200 100644 --- a/src/duckdb/src/planner/pragma_handler.cpp +++ b/src/duckdb/src/planner/pragma_handler.cpp @@ -25,7 +25,7 @@ void PragmaHandler::HandlePragmaStatementsInternal(vectortype == StatementType::MULTI_STATEMENT) { auto &multi_statement = statements[i]->Cast(); for (auto &stmt : multi_statement.statements) { - statements.push_back(std::move(stmt)); + new_statements.push_back(std::move(stmt)); } continue; } diff --git a/src/duckdb/ub_src_common.cpp b/src/duckdb/ub_src_common.cpp index bed4f4b00..c51f91c48 100644 --- a/src/duckdb/ub_src_common.cpp +++ b/src/duckdb/ub_src_common.cpp @@ -8,6 +8,8 @@ #include "src/common/cgroups.cpp" +#include "src/common/csv_writer.cpp" + #include "src/common/complex_json.cpp" #include "src/common/compressed_file_system.cpp" diff --git a/src/duckdb/ub_src_function_table_system.cpp b/src/duckdb/ub_src_function_table_system.cpp index f2898802b..afa17b21b 100644 --- a/src/duckdb/ub_src_function_table_system.cpp +++ b/src/duckdb/ub_src_function_table_system.cpp @@ -50,6 +50,8 @@ #include "src/function/table/system/duckdb_views.cpp" +#include "src/function/table/system/logging_utils.cpp" + #include "src/function/table/system/pragma_collations.cpp" #include "src/function/table/system/pragma_database_size.cpp"