diff --git a/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/PaimonService.java b/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/PaimonService.java index c406fd0b..4392f9d9 100644 --- a/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/PaimonService.java +++ b/connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/PaimonService.java @@ -16,14 +16,14 @@ import io.tapdata.pdk.apis.context.TapConnectorContext; import io.tapdata.pdk.apis.entity.WriteListResult; import org.apache.hadoop.conf.Configuration; -import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.CatalogContext; -import org.apache.paimon.catalog.CatalogFactory; -import org.apache.paimon.catalog.Identifier; +import org.apache.hadoop.fs.FileSystem; +import org.apache.paimon.catalog.*; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.hadoop.HadoopFileIO; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.Table; @@ -34,6 +34,7 @@ import org.apache.paimon.types.RowKind; import java.io.Closeable; +import java.lang.reflect.Field; import java.math.BigDecimal; import java.util.*; import java.util.concurrent.*; @@ -255,6 +256,7 @@ private Configuration buildHadoopConfiguration() { // NoClassDefFoundError due to classloader/version conflicts. // Ensure S3A filesystem is used when scheme is s3a conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); + conf.set("fs.s3a.impl.disable.cache", "true"); conf.set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A"); } @@ -796,11 +798,11 @@ private WriteListResult writeRecordsWithStreamWriteInternal(List WriteListResult result = new WriteListResult<>(); Identifier identifier = Identifier.create(database, tableName); - // Get or create cached writer and commit - StreamTableWrite writer = getOrCreateStreamWriter(tableKey, identifier); - StreamTableCommit commit = getOrCreateStreamCommit(tableKey, identifier); - try { + // Get or create cached writer and commit + StreamTableWrite writer = getOrCreateStreamWriter(tableKey, identifier); + StreamTableCommit commit = getOrCreateStreamCommit(tableKey, identifier); + // Write all records to the writer for (TapRecordEvent event : recordEvents) { if (event instanceof TapInsertRecordEvent) { @@ -946,11 +948,30 @@ private void cleanupAllResources() { // Close old catalog if exists if (catalog != null) { try { + if (catalog instanceof CachingCatalog) { + CachingCatalog cachingCatalog = (CachingCatalog) catalog; + Catalog wrapped = cachingCatalog.wrapped(); + if (wrapped instanceof FileSystemCatalog) { + FileSystemCatalog fileSystemCatalog = (FileSystemCatalog) wrapped; + FileIO fileIO = null; + try { + fileIO = fileSystemCatalog.fileIO(); + } catch (Throwable ignore) { + // Ignore fileIO lookup errors + } + + // Best-effort close: proactively close FileSystem instances cached by HadoopFileIO + closeHadoopFileIOCachedFileSystems(fileIO); + closeQuietly(fileIO); + } + } + catalog.close(); - } catch (Exception e) { + } catch (Throwable e) { // Ignore close errors + } finally { + catalog = null; } - catalog = null; } // Wait a bit to ensure all internal threads are cleaned up @@ -962,6 +983,66 @@ private void cleanupAllResources() { } } + private void closeQuietly(Closeable closeable) { + if (closeable == null) { + return; + } + try { + closeable.close(); + } catch (Exception ignore) { + // Ignore close errors + } + } + + /** + * Best-effort close for cached Hadoop FileSystem instances inside Paimon HadoopFileIO. + *

+ * HadoopFileIO may cache FileSystem instances (e.g., in a field named "fsMap"). Even if + * Hadoop global FileSystem cache is disabled, this internal cache can still keep an S3A + * FileSystem whose thread factory captured a Task ThreadGroup that will be destroyed later. + */ + private void closeHadoopFileIOCachedFileSystems(Object fileIO) { + if (!(fileIO instanceof HadoopFileIO)) { + return; + } + + try { + Field fsMapField = fileIO.getClass().getDeclaredField("fsMap"); + fsMapField.setAccessible(true); + Object fsMapObject = fsMapField.get(fileIO); + if (!(fsMapObject instanceof Map)) { + return; + } + + Map fsMap = (Map) fsMapObject; + if (fsMap.isEmpty()) { + return; + } + + // Copy values first to avoid ConcurrentModificationException in case close triggers internal updates. + List fileSystems = new ArrayList<>(fsMap.values()); + for (Object fs : fileSystems) { + if (fs instanceof FileSystem) { + try { + ((FileSystem) fs).close(); + } catch (Exception ignore) { + // Ignore close errors + } + } + } + + try { + fsMap.clear(); + } catch (Exception ignore) { + // Ignore clear errors + } + } catch (NoSuchFieldException ignore) { + // HadoopFileIO implementation differs; ignore. + } catch (Throwable ignore) { + // Best-effort only + } + } + /** * Check if the exception is caused by ThreadGroup being destroyed. * This typically happens when the classloader that created Paimon's thread factory