I am using pyflink 1.17.1 and i am getting this error "RuntimeError: java.lang.UnsupportedOperationException: A serializer has already been registered for the state; re-registration is not allowed". Need your help with this. when i try to sink data using JDBC Sink its not working it shows this error but when i try simply print then it works as expected
here is the main.py:
db_host = config.get("POSTGRES_HOST")
db_port = config.get("POSTGRES_PORT")
db_name = config.get("POSTGRES_DB")
db_url = f"jdbc:postgresql://{db_host}:{db_port}/{db_name}"
db_driver_name = "org.postgresql.Driver"
db_username = config.get("POSTGRES_USER")
db_password = config.get("POSTGRES_PASSWORD")
db_table = config.get("POSTGRES_TABLE")
db_connection = Connection(
username=db_username,
password=db_password,
host=db_host,
port=db_port,
database=db_name,
)
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars(
f'file://{abspath("connector_jars/flink-sql-connector-kafka-1.17.1.jar")}',
f'file://{abspath("connector_jars/flink-connector-jdbc-3.1.1-1.17.jar")}',
f'file://{abspath("connector_jars/postgresql-42.6.0.jar")}',
)
env.set_parallelism(8)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
kafka_source_type_info = Types.ROW_NAMED(
[
"open_time",
"open_price",
"high_price",
"low_price",
"close_price",
"volume",
"close_time",
"base_asset_value",
"number_of_trades",
"taker_buy_base_asset_volume",
"taker_buy_quote_asset_volume",
"ignore_value",
"symbol",
"x",
],
[
Types.LONG(),
Types.DOUBLE(),
Types.DOUBLE(),
Types.DOUBLE(),
Types.DOUBLE(),
Types.DOUBLE(),
Types.LONG(),
Types.DOUBLE(),
Types.INT(),
Types.DOUBLE(),
Types.DOUBLE(),
Types.INT(),
Types.STRING(),
Types.STRING(),
],
)
deserialization_schema = (
JsonRowDeserializationSchema.Builder().type_info(kafka_source_type_info).build()
)
kafka_timestamp = int((datetime.now() - timedelta(days=1)).timestamp())
kafka_brokers = config.get("KAFKA_BROKER")
kafka_source = (
KafkaSource.builder()
.set_bootstrap_servers(kafka_brokers)
.set_topics(config.get("KAFKA_TOPIC"))
.set_starting_offsets(KafkaOffsetsInitializer.timestamp(kafka_timestamp))
.set_value_only_deserializer(deserialization_schema)
.build()
)
watermark_strategy = (
WatermarkStrategy.for_monotonous_timestamps()
.with_timestamp_assigner(KlineTimestampAssigner())
.with_idleness(Duration.of_minutes(1))
)
# JDBC Sink
jdbc_connection_options = (
JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.with_url(db_url)
.with_driver_name(db_driver_name)
.with_user_name(db_username)
.with_password(db_password)
.build()
)
jdbc_execution_options = (
JdbcExecutionOptions.builder()
.with_batch_interval_ms(1000)
.with_batch_size(200)
.with_max_retries(5)
.build()
)
ds = env.from_source(
source=kafka_source,
watermark_strategy=watermark_strategy,
source_name="XpertCoin RedPanda Source",
type_info=kafka_source_type_info,
)
ds = ds.key_by(key_by_symbol, key_type=Types.STRING())
# Calculating 5 minutes Interval Indicators
query_5m = "insert into indicator_flink (symbol, exchange) values (?, ?)"
sub_ds_5m = ds.window(TumblingEventTimeWindows.of(Time.minutes(5))).reduce(
last_element_for_record
)
indicators_stream_5m = (
sub_ds_5m.key_by(key_by_symbol, key_type=Types.STRING())
.window(CountTumblingWindowAssigner.of(220))
.process(
TechnicalIndicatorProcessFunction(),
output_type=Types.TUPLE([Types.STRING(), Types.STRING()]),
)
.uid("technical-indicators-5m")
)
# sink indicators_stream_5m to postgresql database table
indicators_stream_5m.add_sink(
JdbcSink.sink(
sql=query_5m,
type_info=Types.TUPLE([Types.STRING(), Types.STRING()]),
jdbc_connection_options=jdbc_connection_options,
jdbc_execution_options=jdbc_execution_options,
)
)
env.execute()
here is the TechnicalProcessFunction:
class TechnicalIndicatorProcessFunction(
ProcessWindowFunction[tuple, tuple, str, TimeWindow]
):
def open(self, runtime_context: RuntimeContext):
return super().open(runtime_context)
def process(
self,
key: str,
context: ProcessWindowFunction.Context[TimeWindow],
elements: Iterable,
) -> Iterable[Tuple[str, str]]:
recent_record: Types.ROW_NAMED() = elements[-1]
return [
(
recent_record["symbol"],
"BINANCE",
)
]
def clear(self, context: ProcessWindowFunction.Context) -> None:
return super().clear(context)
def close(self):
return super().close()
I am reading data from a redpanda topic and after some processing i wanted to save that data to a table. I have implemented a solution it works fine when i print the stream but its not working when i am trying to sink using JDBCSink