Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Vertica DB support on dataframe-jdbc #604

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions dataframe-jdbc/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies {
testImplementation(libs.postgresql)
testImplementation(libs.mysql)
testImplementation(libs.h2db)
testImplementation(libs.vertica)
testImplementation(libs.junit)
testImplementation(libs.sl4j)
testImplementation(libs.kotestAssertions) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.jetbrains.kotlinx.dataframe.io.db

import org.jetbrains.kotlinx.dataframe.io.TableColumnMetadata
import org.jetbrains.kotlinx.dataframe.io.TableMetadata
import org.jetbrains.kotlinx.dataframe.schema.ColumnSchema
import java.sql.ResultSet
import kotlin.reflect.KType
import kotlin.reflect.full.createType

/**
* Represents the Vertica database type.
*
* This class provides methods to convert data from a ResultSet to the appropriate type for Vertica,
* and to generate the corresponding column schema.
*/
public object Vertica : DbType("vertica") {
override val driverClassName: String
get() = "com.vertica.jdbc.Driver"

override fun convertSqlTypeToColumnSchemaValue(tableColumnMetadata: TableColumnMetadata): ColumnSchema? =
when(tableColumnMetadata.sqlTypeName.uppercase()) {
"UUID" -> ColumnSchema.Value(String::class.createType(nullable = tableColumnMetadata.isNullable))
"ARRAY" -> ColumnSchema.Value(String::class.createType(nullable = tableColumnMetadata.isNullable))
"UNKNOWN" -> ColumnSchema.Value(String::class.createType(nullable = tableColumnMetadata.isNullable))
else -> null
}

override fun isSystemTable(tableMetadata: TableMetadata): Boolean {
val schemaName = tableMetadata.schemaName

return schemaName?.startsWith("v_", true) ?: false
}

override fun buildTableMetadata(tables: ResultSet): TableMetadata {
return TableMetadata(
tables.getString("table_name"),
tables.getString("table_schem"),
tables.getString("table_cat"))
}

override fun convertSqlTypeToKType(tableColumnMetadata: TableColumnMetadata): KType? =
when(tableColumnMetadata.sqlTypeName.uppercase()) {
"UUID" -> String::class.createType(nullable = tableColumnMetadata.isNullable)
"ARRAY" -> String::class.createType(nullable = tableColumnMetadata.isNullable)
"UNKNOWN" -> String::class.createType(nullable = tableColumnMetadata.isNullable)
else -> null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ public fun extractDBTypeFromUrl(url: String?): DbType {
MySql.dbTypeInJdbcUrl in url -> MySql
Sqlite.dbTypeInJdbcUrl in url -> Sqlite
PostgreSql.dbTypeInJdbcUrl in url -> PostgreSql
Vertica.dbTypeInJdbcUrl in url -> Vertica
else -> throw IllegalArgumentException("Unsupported database type in the url: $url. " +
"Only H2, MariaDB, MySQL, SQLite and PostgreSQL are supported!")
"Only H2, MariaDB, MySQL, SQLite, PostgreSQL and Vertica are supported!")
}
} else {
throw SQLException("Database URL could not be null. The existing value is $url")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,32 @@
package org.jetbrains.kotlinx.dataframe.io

import io.github.oshai.kotlinlogging.KotlinLogging
import org.jetbrains.kotlinx.dataframe.AnyFrame
import org.jetbrains.kotlinx.dataframe.DataColumn
import org.jetbrains.kotlinx.dataframe.DataFrame
import org.jetbrains.kotlinx.dataframe.api.toDataFrame
import org.jetbrains.kotlinx.dataframe.impl.schema.DataFrameSchemaImpl
import org.jetbrains.kotlinx.dataframe.io.db.DbType
import org.jetbrains.kotlinx.dataframe.io.db.Vertica
import org.jetbrains.kotlinx.dataframe.io.db.extractDBTypeFromUrl
import org.jetbrains.kotlinx.dataframe.schema.ColumnSchema
import org.jetbrains.kotlinx.dataframe.schema.DataFrameSchema
import java.math.BigDecimal
import java.sql.Blob
import java.sql.Clob
import java.sql.Connection
import java.sql.DatabaseMetaData
import java.sql.DriverManager
import java.sql.NClob
import java.sql.Ref
import java.sql.ResultSet
import java.sql.ResultSetMetaData
import java.sql.RowId
import java.sql.SQLXML
import java.sql.Time
import java.sql.Timestamp
import java.sql.Types
import java.sql.RowId
import java.sql.Ref
import java.sql.Clob
import java.sql.Blob
import java.sql.NClob
import java.sql.SQLXML
import java.util.Date
import org.jetbrains.kotlinx.dataframe.AnyFrame
import org.jetbrains.kotlinx.dataframe.DataColumn
import org.jetbrains.kotlinx.dataframe.DataFrame
import org.jetbrains.kotlinx.dataframe.api.toDataFrame
import org.jetbrains.kotlinx.dataframe.impl.schema.DataFrameSchemaImpl
import org.jetbrains.kotlinx.dataframe.io.db.DbType
import org.jetbrains.kotlinx.dataframe.io.db.extractDBTypeFromUrl
import org.jetbrains.kotlinx.dataframe.schema.ColumnSchema
import org.jetbrains.kotlinx.dataframe.schema.DataFrameSchema
import kotlin.reflect.KType
import kotlin.reflect.full.createType
import kotlin.reflect.full.isSupertypeOf
Expand Down Expand Up @@ -296,7 +297,7 @@ public fun DataFrame.Companion.readAllSqlTables(
if (!dbType.isSystemTable(table)) {
// we filter her second time because of specific logic with SQLite and possible issues with future databases
// val tableName = if (table.catalogue != null) table.catalogue + "." + table.name else table.name
val tableName = if (catalogue != null) catalogue + "." + table.name else table.name
val tableName = getTableName(catalogue, table, dbType)

// TODO: both cases is schema specified or not in URL
// in h2 database name is recognized as a schema name https://www.h2database.com/html/features.html#database_url
Expand All @@ -305,6 +306,7 @@ public fun DataFrame.Companion.readAllSqlTables(
logger.debug { "Reading table: $tableName" }

val dataFrame = readSqlTable(connection, tableName, limit)

dataFrames += dataFrame
logger.debug { "Finished reading table: $tableName" }
}
Expand All @@ -313,6 +315,14 @@ public fun DataFrame.Companion.readAllSqlTables(
return dataFrames
}

private fun getTableName(catalogue: String?, table: TableMetadata, dbType: DbType) =
catalogue
?.let { catalogue + "." + table.name }
?: when (dbType) {
Vertica -> "${table.schemaName}.${table.name}" // Vertica needs schema name
else -> table.name
}

/**
* Retrieves the schema for an SQL table using the provided database configuration.
*
Expand Down Expand Up @@ -642,7 +652,7 @@ private fun generateKType(dbType: DbType, tableColumnMetadata: TableColumnMetada
* @param tableColumnMetadata The metadata of the table column.
* @return The KType associated with the SQL type, or a default type if no mapping is found.
*/
private fun makeCommonSqlToKTypeMapping(tableColumnMetadata: TableColumnMetadata): KType {
public fun makeCommonSqlToKTypeMapping(tableColumnMetadata: TableColumnMetadata): KType {
val jdbcTypeToKTypeMapping = mapOf(
Types.BIT to Boolean::class,
Types.TINYINT to Int::class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,38 @@ class MySqlTest {
@Language("SQL")
val insertData2 = """
INSERT INTO table2 (
bitCol, tinyintCol, smallintCol, mediumintCol, mediumintUnsignedCol, integerCol, intCol,
integerUnsignedCol, bigintCol, floatCol, doubleCol, decimalCol, dateCol, datetimeCol, timestampCol,
timeCol, yearCol, varcharCol, charCol, binaryCol, varbinaryCol, tinyblobCol, blobCol,
mediumblobCol, longblobCol, textCol, mediumtextCol, longtextCol, enumCol, setCol, location, data
bitCol,
tinyintCol,
smallintCol,
mediumintCol,
mediumintUnsignedCol,
integerCol,
intCol,
integerUnsignedCol,
bigintCol,
floatCol,
doubleCol,
decimalCol,
dateCol,
datetimeCol,
timestampCol,
timeCol,
yearCol,
varcharCol,
charCol,
binaryCol,
varbinaryCol,
tinyblobCol,
blobCol,
mediumblobCol,
longblobCol,
textCol,
mediumtextCol,
longtextCol,
enumCol,
setCol,
location,
data
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ST_GeomFromText('POINT(1 1)'), ?)
""".trimIndent()

Expand Down