DEV Community

anil
anil

Posted on

Kafka JDBC Connector

Hi,

I am working on Kafka Oracle to Postgress DB Data Migration.

Oracle Table

Name Null? Type

CHECKNO NUMBER(5)
APPLICATION_NUMBER NOT NULL VARCHAR2(15)
CUSTOMERID NOT NULL VARCHAR2(10)
PURCHASE_ORDERNO NOT NULL VARCHAR2(15)
MATERIALNO NOT NULL VARCHAR2(15)
ORDER_TYPE NOT NULL VARCHAR2(6)
SUPPLIER NOT NULL VARCHAR2(14)
UNIT NOT NULL NUMBER(6)
DATETIME TIMESTAMP(6)
ADDRESS VARCHAR2(15)
MOBILENO NOT NULL NUMBER(10)
EMAIL_ADDRESS VARCHAR2(15)

I am executing through standalone Kafka JDBC connector and getting below errors.

[2024-02-05 20:17:33,624] ERROR [Oracle-Source-Kafka|task-0] Non-transient SQL exception while running query for table: TimestampIncrementingTableQuerier{table=null, query=‘SELECT * FROM (select APPLICATION_NUMBER,CUSTOMERID,PURCHASE_ORDERNO,MATERIALNO,ORDER_TYPE,SUPPLIER,UNIT,DATETIME,ADDRESS,MOBILENO,EMAIL_ADDRESS from salesorder) AS SALESORDER’, topicPrefix=‘’, incrementingColumn=‘checkno’, timestampColumns=[DATETIME]} (io.confluent.connect.jdbc.source.JdbcSourceTask:470)
java.sql.SQLSyntaxErrorException: ORA-00933: SQL command not properly ended

at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:630)
at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:564)
at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1231)
at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:772)
at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:299)
at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:512)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:163)
at oracle.jdbc.driver.T4CPreparedStatement.executeForDescribe(T4CPreparedStatement.java:1010)
at oracle.jdbc.driver.OracleStatement.prepareDefineBufferAndExecute(OracleStatement.java:1271)
at oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:1149)
at oracle.jdbc.driver.OracleStatement.executeSQLSelect(OracleStatement.java:1661)
at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1470)
at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3761)
at oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:3936)
at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeQuery(OraclePreparedStatementWrapper.java:1102)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:213)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:164)
at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:436)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Enter fullscreen mode Exit fullscreen mode

Caused by: Error : 933, Position : 162, Sql = SELECT * FROM (select APPLICATION_NUMBER,CUSTOMERID,PURCHASE_ORDERNO,MATERIALNO,ORDER_TYPE,SUPPLIER,UNIT,DATETIME,ADDRESS,MOBILENO,EMAIL_ADDRESS from salesorder) AS SALESORDER WHERE “DATETIME”**_

  • < :1 AND ((“DATETIME” = :2 AND “checkno” > :3 ) OR “DATETIME” > :4 ) _** ORDER BY “DATETIME”,“checkno” ASC, OriginalSql = SELECT * FROM (select APPLICATION_NUMBER,CUSTOMERID,PURCHASE_ORDERNO,MATERIALNO,ORDER_TYPE,SUPPLIER,UNIT,DATETIME,ADDRESS,MOBILENO,EMAIL_ADDRESS from salesorder) AS SALESORDER WHERE “DATETIME” < ? AND ((“DATETIME” = ? AND “checkno” > ?) OR “DATETIME” > ?) ORDER BY “DATETIME”,“checkno” ASC, Error Msg = ORA-00933: SQL command not properly ended

Executed below command

c:\kafka\bin\windows>connect-standalone.bat …..\config\connect-standalone.properties …..\config\jdbc-source.json

Connect-standalone property file *

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
it to

key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
Flush much faster than normal, which is useful for testing/debugging

offset.flush.interval.ms=10000
plugin.path=C:\kafka\libs

**Worker source property File **

{
name=jdbc_source_connector_oracle_01
config={
name=Oracle-Source-Kafka
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
task.max=1

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

// transforms=createKey,setSchema
// transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
// transforms.createKey.fields=Unit,DATETIME
// transforms.setSchema.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
// transforms.setSchema.schema.name=SalesOrderRecords

//JDBC Source Connectors specific properties

connection.url=jdbc:oracle:thin:@localhost:1521/orcl
connection.user=
connection.password=
dialect.name=OracleDatabaseDialect

mode=timestamp+incrementing
incrementing.column.name=checkno
timestamp.column.name=DATETIME

numeric.precision.mapping=true
numeric.mapping=best_fit

query=SELECT * FROM (select APPLICATION_NUMBER,CUSTOMERID,PURCHASE_ORDERNO,MATERIALNO,ORDER_TYPE,SUPPLIER,UNIT,DATETIME,ADDRESS,MOBILENO,EMAIL_ADDRESS from salesorder) AS SALESORDER
table.type=TABLE

poll.interval=5000
batch.max.rows=2
topics=sorder
db.timezone=Asia/Kolkata
}
}

how, we are passing these values below, pleaselet us know.

  • < :1 AND ((“DATETIME” = :2 AND “checkno” > :3 ) OR “DATETIME” > :4 )

while executing, custom function where cluase adding. I am not getting how values needs to added below. I am having some doubts and issue not resolving, Please help here.

1). where we can give input information/is it coming runtime, (“?”), how it is taking input values
2). how to resolve SQL Command not properly ended
3). I am inserting data from one database and one simple query, please let me know, can we do multiple statements in single query
4). can we use multiple query in same source/sink connector property files.
Please help to resolve above

Top comments (0)