diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/db2/DB2Catalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/db2/DB2Catalog.java new file mode 100644 index 00000000000..6606b3a86ed --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/db2/DB2Catalog.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.db2; + +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.exception.CatalogException; +import org.apache.seatunnel.api.table.converter.BasicTypeDefine; +import org.apache.seatunnel.common.utils.JdbcUrlUtil; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.db2.DB2TypeConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.db2.DB2TypeMapper; + +import org.apache.commons.lang3.StringUtils; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +@Slf4j +public class DB2Catalog extends AbstractJdbcCatalog { + + protected final Map connectionMap; + + private static final String SELECT_COLUMNS_SQL = + "SELECT NAME AS column_name,\n" + + " TYPENAME AS type_name,\n" + + " TYPENAME AS full_type_name,\n" + + " (CASE WHEN LENGTH = -1 THEN LONGLENGTH ELSE LENGTH END) AS column_length,\n" + + " SCALE AS column_scale,\n" + + " REMARKS AS column_comment,\n" + + " DEFAULT AS default_value,\n" + + " NULLS AS is_nullable\n" + + "FROM SYSIBM.SYSCOLUMNS WHERE TBCREATOR = '%s' AND TBNAME = '%s' ORDER BY COLNO ASC"; + + public DB2Catalog( + String catalogName, + String username, + String pwd, + JdbcUrlUtil.UrlInfo urlInfo, + String defaultSchema) { + super(catalogName, username, pwd, urlInfo, defaultSchema); + this.connectionMap = new ConcurrentHashMap<>(); + } + + @SneakyThrows + @Override + public List listDatabases() throws CatalogException { + try (ResultSet resultSet = + getConnection(getUrlFromDatabaseName(defaultDatabase)) + .getMetaData() + .getCatalogs()) { + return Collections.singletonList(resultSet.getMetaData().getCatalogName(1)); + } + } + + protected String getTableWithConditionSql(TablePath tablePath) { + return getListTableSql(tablePath.getDatabaseName()) + + " and TABSCHEMA = '" + + tablePath.getSchemaName() + + "' and TABNAME = '" + + tablePath.getTableName() + + "'"; + } + + @Override + protected String getSelectColumnsSql(TablePath tablePath) { + return String.format( + SELECT_COLUMNS_SQL, tablePath.getSchemaName(), tablePath.getTableName()); + } + + @Override + protected String getListTableSql(String databaseName) { + return "SELECT TABSCHEMA , TABNAME FROM SYSCAT.TABLES WHERE TABSCHEMA NOT IN ('SYSCAT','SYSIBM','SYSIBMADM','SYSPUBLIC','SYSSTAT','SYSTOOLS')"; + } + + @Override + protected String getCreateTableSql( + TablePath tablePath, CatalogTable table, boolean createIndex) { + String createTableSql = new DB2CreateTableSqlBuilder(table).build(tablePath); + return CatalogUtils.getFieldIde(createTableSql, table.getOptions().get("fieldIde")); + } + + @Override + protected String getDropTableSql(TablePath tablePath) { + return String.format( + "DROP TABLE IF EXISTS %s.%s ", + tablePath.getSchemaName(), "\"" + tablePath.getTableName() + "\""); + } + + @Override + protected String getTruncateTableSql(TablePath tablePath) { + return String.format( + "ALTER TABLE %s.%s ACTIVATE NOT LOGGED INITIALLY WITH EMPTY TABLE", + tablePath.getSchemaName(), "\"" + tablePath.getTableName() + "\""); + } + + protected Optional getPrimaryKey( + DatabaseMetaData metaData, String database, String schema, String table) + throws SQLException { + List fields = getPrimaryKeyFieldList(schema, table); + if (fields == null || fields.isEmpty()) { + return Optional.empty(); + } + return Optional.of(PrimaryKey.of(getPrimaryKeyName(schema, table), fields)); + } + + private List getPrimaryKeyFieldList(String schema, String table) { + String getPrimaryKeyFieldSql = + String.format( + "SELECT COLNAME FROM SYSCAT.KEYCOLUSE WHERE TABSCHEMA = '%s' AND TABNAME = '%s';", + schema, table); + Connection connection = getConnection(getUrlFromDatabaseName(defaultDatabase)); + List primaryKeyColNameList = new ArrayList<>(); + try (Statement ps = connection.createStatement(); + ResultSet resultSet = ps.executeQuery(getPrimaryKeyFieldSql)) { + + while (resultSet.next()) { + String primaryKeyColName = resultSet.getString("COLNAME"); + primaryKeyColNameList.add(primaryKeyColName); + } + return primaryKeyColNameList; + } catch (SQLException e) { + throw new CatalogException( + String.format("Failed getPrimaryKeyFieldList table %s", table), e); + } + } + + private String getPrimaryKeyName(String schema, String table) { + String getPrimaryKeyNameSql = + String.format( + "SELECT INDNAME FROM SYSCAT.INDEXES WHERE UNIQUERULE = 'P' AND TABSCHEMA = '%s' AND TABNAME = '%s' ;", + schema, table); + Connection connection = getConnection(getUrlFromDatabaseName(defaultDatabase)); + try (Statement ps = connection.createStatement(); + ResultSet resultSet = ps.executeQuery(getPrimaryKeyNameSql)) { + + while (resultSet.next()) { + String primaryKeyColName = resultSet.getString("INDNAME"); + if (StringUtils.isNotEmpty(primaryKeyColName)) { + return primaryKeyColName; + } + } + } catch (SQLException e) { + throw new CatalogException( + String.format("Failed getPrimaryKeyName table %s", table), e); + } + return null; + } + + @Override + public String getExistDataSql(TablePath tablePath) { + return String.format( + "select * from %s.%s FETCH FIRST 1 ROW ONLY;", + tablePath.getSchemaName(), "\"" + tablePath.getTableName() + "\""); + } + + public Connection getConnection(String url) { + if (connectionMap.containsKey(url)) { + return connectionMap.get(url); + } + try { + Connection connection = DriverManager.getConnection(url, username, pwd); + connectionMap.put(url, connection); + return connection; + } catch (SQLException e) { + throw new CatalogException(String.format("Failed connecting to %s via JDBC.", url), e); + } + } + + @Override + protected Column buildColumn(ResultSet resultSet) throws SQLException { + String columnName = resultSet.getString("column_name"); + String typeName = resultSet.getString("type_name").trim(); + String fullTypeName = resultSet.getString("full_type_name").trim(); + long columnLength = resultSet.getLong("column_length"); + int columnScale = resultSet.getInt("column_scale"); + String columnComment = resultSet.getString("column_comment"); + Object defaultValue = resultSet.getObject("default_value"); + boolean isNullable = resultSet.getString("is_nullable").equals("Y"); + + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name(columnName) + .columnType(fullTypeName) + .dataType(typeName) + .length(columnLength) + .precision(columnLength) + .scale(columnScale) + .nullable(isNullable) + .defaultValue(defaultValue) + .comment(columnComment) + .build(); + return DB2TypeConverter.INSTANCE.convert(typeDefine); + } + + @Override + public CatalogTable getTable(String sqlQuery) throws SQLException { + return CatalogUtils.getCatalogTable( + getConnection(getUrlFromDatabaseName(defaultDatabase)), + sqlQuery, + new DB2TypeMapper()); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/db2/DB2CatalogFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/db2/DB2CatalogFactory.java new file mode 100644 index 00000000000..684fe2508da --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/db2/DB2CatalogFactory.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.db2; + +import org.apache.seatunnel.shade.com.google.common.base.Preconditions; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.configuration.util.OptionValidationException; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.factory.CatalogFactory; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.common.utils.JdbcUrlUtil; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; + +import org.apache.commons.lang3.StringUtils; + +import com.google.auto.service.AutoService; + +import java.util.Optional; + +@AutoService(Factory.class) +public class DB2CatalogFactory implements CatalogFactory { + + @Override + public String factoryIdentifier() { + return DatabaseIdentifier.DB_2; + } + + @Override + public Catalog createCatalog(String catalogName, ReadonlyConfig options) { + String urlWithDatabase = options.get(JdbcCatalogOptions.BASE_URL); + Preconditions.checkArgument( + StringUtils.isNotBlank(urlWithDatabase), + "Miss config ! Please check your config."); + JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(urlWithDatabase); + Optional defaultDatabase = urlInfo.getDefaultDatabase(); + if (!defaultDatabase.isPresent()) { + throw new OptionValidationException(JdbcCatalogOptions.BASE_URL); + } + return new DB2Catalog( + catalogName, + options.get(JdbcCatalogOptions.USERNAME), + options.get(JdbcCatalogOptions.PASSWORD), + urlInfo, + options.get(JdbcCatalogOptions.SCHEMA)); + } + + @Override + public OptionRule optionRule() { + return JdbcCatalogOptions.BASE_RULE.build(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/db2/DB2CreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/db2/DB2CreateTableSqlBuilder.java new file mode 100644 index 00000000000..16ca37177de --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/db2/DB2CreateTableSqlBuilder.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.db2; + +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.db2.DB2TypeConverter; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +public class DB2CreateTableSqlBuilder { + + private List columns; + private PrimaryKey primaryKey; + private String sourceCatalogName; + private String fieldIde; + private List constraintKeys; + public Boolean isHaveConstraintKey = false; + + public DB2CreateTableSqlBuilder(CatalogTable catalogTable) { + this.columns = catalogTable.getTableSchema().getColumns(); + this.primaryKey = catalogTable.getTableSchema().getPrimaryKey(); + this.sourceCatalogName = catalogTable.getCatalogName(); + this.fieldIde = catalogTable.getOptions().get("fieldIde"); + constraintKeys = catalogTable.getTableSchema().getConstraintKeys(); + } + + public String build(TablePath tablePath) { + StringBuilder createTableSql = new StringBuilder(); + createTableSql + .append(CatalogUtils.quoteIdentifier("CREATE TABLE IF NOT EXISTS ", fieldIde)) + .append(tablePath.getSchemaAndTableName("\"")) + .append(" (\n"); + + List columnSqls = + columns.stream() + .map( + column -> + CatalogUtils.quoteIdentifier( + buildColumnSql(column), fieldIde)) + .collect(Collectors.toList()); + + if (primaryKey != null && !primaryKey.getColumnNames().isEmpty()) { + String key = + primaryKey.getColumnNames().stream() + .map( + columnName -> + "\"" + + CatalogUtils.getFieldIde(columnName, fieldIde) + + "\"") + .collect(Collectors.joining(", ")); + columnSqls.add("PRIMARY KEY ( " + key + " )"); + } + + if (CollectionUtils.isNotEmpty(constraintKeys)) { + for (ConstraintKey constraintKey : constraintKeys) { + if (StringUtils.isBlank(constraintKey.getConstraintName()) + || (primaryKey != null + && StringUtils.equals( + primaryKey.getPrimaryKey(), + constraintKey.getConstraintName()))) { + continue; + } + String constraintKeySql = buildConstraintKeySql(constraintKey); + if (StringUtils.isNotEmpty(constraintKeySql)) { + columnSqls.add("\t" + constraintKeySql); + isHaveConstraintKey = true; + } + } + } + + createTableSql.append(String.join(",\n", columnSqls)); + createTableSql.append("\n);"); + + List commentSqls = + columns.stream() + .filter(column -> StringUtils.isNotBlank(column.getComment())) + .map( + columns -> + buildColumnCommentSql( + columns, tablePath.getSchemaAndTableName("\""))) + .collect(Collectors.toList()); + + if (!commentSqls.isEmpty()) { + createTableSql.append("\n"); + createTableSql.append(String.join(";\n", commentSqls)).append(";"); + } + + return createTableSql.toString(); + } + + private String buildColumnSql(Column column) { + StringBuilder columnSql = new StringBuilder(); + columnSql.append("\"").append(column.getName()).append("\" "); + + // For simplicity, assume the column type in SeaTunnelDataType is the same as in DB2 + String columnType = + sourceCatalogName.equals(DatabaseIdentifier.DB_2) + && StringUtils.isNotBlank(column.getSourceType()) + ? column.getSourceType() + : DB2TypeConverter.INSTANCE.reconvert(column).getColumnType(); + columnSql.append(columnType); + + // Add NOT NULL if column is not nullable + if (!column.isNullable()) { + columnSql.append(" NOT NULL"); + } + + return columnSql.toString(); + } + + private String buildColumnCommentSql(Column column, String tableName) { + StringBuilder columnCommentSql = new StringBuilder(); + columnCommentSql + .append(CatalogUtils.quoteIdentifier("COMMENT ON COLUMN ", fieldIde)) + .append(tableName) + .append("."); + columnCommentSql + .append(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "\"")) + .append(CatalogUtils.quoteIdentifier(" IS '", fieldIde)) + .append(column.getComment().replace("'", "''").replace("\\", "\\\\")) + .append("'"); + return columnCommentSql.toString(); + } + + private String buildConstraintKeySql(ConstraintKey constraintKey) { + ConstraintKey.ConstraintType constraintType = constraintKey.getConstraintType(); + String randomSuffix = UUID.randomUUID().toString().replace("-", "").substring(0, 4); + + String constraintName = constraintKey.getConstraintName(); + if (constraintName.length() > 25) { + constraintName = constraintName.substring(0, 25); + } + String indexColumns = + constraintKey.getColumnNames().stream() + .map( + constraintKeyColumn -> + String.format( + "\"%s\"", + CatalogUtils.getFieldIde( + constraintKeyColumn.getColumnName(), + fieldIde))) + .collect(Collectors.joining(", ")); + + String keyName = null; + switch (constraintType) { + case INDEX_KEY: + keyName = "KEY"; + break; + case UNIQUE_KEY: + keyName = "UNIQUE"; + break; + case FOREIGN_KEY: + keyName = "FOREIGN KEY"; + // todo: + break; + default: + throw new UnsupportedOperationException( + "Unsupported constraint type: " + constraintType); + } + + if (StringUtils.equals(keyName, "UNIQUE")) { + isHaveConstraintKey = true; + return "CONSTRAINT " + + constraintName + + "_" + + randomSuffix + + " UNIQUE (" + + indexColumns + + ")"; + } + // todo KEY AND FOREIGN_KEY + return null; + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2IT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2IT.java index a876d9bf7a0..bbe6d9684fe 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2IT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2IT.java @@ -20,6 +20,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; +import org.apache.seatunnel.common.utils.JdbcUrlUtil; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.db2.DB2Catalog; import org.apache.commons.lang3.tuple.Pair; @@ -53,7 +55,8 @@ public class JdbcDb2IT extends AbstractJdbcIT { private static final String DRIVER_CLASS = "com.ibm.db2.jcc.DB2Driver"; private static final List CONFIG_FILE = - Lists.newArrayList("/jdbc_db2_source_and_sink.conf"); + Lists.newArrayList( + "/jdbc_db2_source_and_sink.conf", "/jdbc_db2_source_and_save_mode_sink.conf"); /** db2 in dockerhub */ private static final String DB2_IMAGE = "ibmcom/db2"; @@ -214,4 +217,17 @@ public void clearTable(String schema, String table) { throw new SeaTunnelRuntimeException(JdbcITErrorCode.CLEAR_TABLE_FAILED, e); } } + + @Override + protected void initCatalog() { + catalog = + new DB2Catalog( + "DB2", + jdbcCase.getUserName(), + jdbcCase.getPassword(), + JdbcUrlUtil.getUrlInfo( + jdbcCase.getJdbcUrl().replace(HOST, dbServer.getHost())), + "E2E"); + catalog.open(); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_db2_source_and_save_mode_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_db2_source_and_save_mode_sink.conf new file mode 100644 index 00000000000..e4e045b4242 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_db2_source_and_save_mode_sink.conf @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Jdbc { + driver = com.ibm.db2.jcc.DB2Driver + url = "jdbc:db2://db2-e2e:50000/E2E" + user = "db2inst1" + password = "123456" + query = """ + select * from "E2E".SOURCE; + """ + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc +} + +sink { + Jdbc { + driver = com.ibm.db2.jcc.DB2Driver + url = "jdbc:db2://db2-e2e:50000/E2E" + user = "db2inst1" + password = "123456" + table = E2E.${table_name}_test + "schema_save_mode"="RECREATE_SCHEMA" + "data_save_mode"="DROP_DATA" + } + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc +}