6

I am using pyflink 1.17.1 and i am getting this error "RuntimeError: java.lang.UnsupportedOperationException: A serializer has already been registered for the state; re-registration is not allowed". Need your help with this. when i try to sink data using JDBC Sink its not working it shows this error but when i try simply print then it works as expected

here is the main.py:

db_host = config.get("POSTGRES_HOST")
db_port = config.get("POSTGRES_PORT")
db_name = config.get("POSTGRES_DB")
db_url = f"jdbc:postgresql://{db_host}:{db_port}/{db_name}"
db_driver_name = "org.postgresql.Driver"
db_username = config.get("POSTGRES_USER")
db_password = config.get("POSTGRES_PASSWORD")
db_table = config.get("POSTGRES_TABLE")

db_connection = Connection(
    username=db_username,
    password=db_password,
    host=db_host,
    port=db_port,
    database=db_name,
)

env = StreamExecutionEnvironment.get_execution_environment()

env.add_jars(
    f'file://{abspath("connector_jars/flink-sql-connector-kafka-1.17.1.jar")}',
    f'file://{abspath("connector_jars/flink-connector-jdbc-3.1.1-1.17.jar")}',
    f'file://{abspath("connector_jars/postgresql-42.6.0.jar")}',
)
env.set_parallelism(8)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)

kafka_source_type_info = Types.ROW_NAMED(
    [
        "open_time",
        "open_price",
        "high_price",
        "low_price",
        "close_price",
        "volume",
        "close_time",
        "base_asset_value",
        "number_of_trades",
        "taker_buy_base_asset_volume",
        "taker_buy_quote_asset_volume",
        "ignore_value",
        "symbol",
        "x",
    ],
    [
        Types.LONG(),
        Types.DOUBLE(),
        Types.DOUBLE(),
        Types.DOUBLE(),
        Types.DOUBLE(),
        Types.DOUBLE(),
        Types.LONG(),
        Types.DOUBLE(),
        Types.INT(),
        Types.DOUBLE(),
        Types.DOUBLE(),
        Types.INT(),
        Types.STRING(),
        Types.STRING(),
    ],
)

deserialization_schema = (
    JsonRowDeserializationSchema.Builder().type_info(kafka_source_type_info).build()
)

kafka_timestamp = int((datetime.now() - timedelta(days=1)).timestamp())

kafka_brokers = config.get("KAFKA_BROKER")

kafka_source = (
    KafkaSource.builder()
    .set_bootstrap_servers(kafka_brokers)
    .set_topics(config.get("KAFKA_TOPIC"))
    .set_starting_offsets(KafkaOffsetsInitializer.timestamp(kafka_timestamp))
    .set_value_only_deserializer(deserialization_schema)
    .build()
)

watermark_strategy = (
    WatermarkStrategy.for_monotonous_timestamps()
    .with_timestamp_assigner(KlineTimestampAssigner())
    .with_idleness(Duration.of_minutes(1))
)

# JDBC Sink
jdbc_connection_options = (
    JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
    .with_url(db_url)
    .with_driver_name(db_driver_name)
    .with_user_name(db_username)
    .with_password(db_password)
    .build()
)

jdbc_execution_options = (
    JdbcExecutionOptions.builder()
    .with_batch_interval_ms(1000)
    .with_batch_size(200)
    .with_max_retries(5)
    .build()
)

ds = env.from_source(
    source=kafka_source,
    watermark_strategy=watermark_strategy,
    source_name="XpertCoin RedPanda Source",
    type_info=kafka_source_type_info,
)

ds = ds.key_by(key_by_symbol, key_type=Types.STRING())

# Calculating 5 minutes Interval Indicators
query_5m = "insert into indicator_flink (symbol, exchange) values (?, ?)"


sub_ds_5m = ds.window(TumblingEventTimeWindows.of(Time.minutes(5))).reduce(
    last_element_for_record
)

indicators_stream_5m = (
    sub_ds_5m.key_by(key_by_symbol, key_type=Types.STRING())
    .window(CountTumblingWindowAssigner.of(220))
    .process(
        TechnicalIndicatorProcessFunction(),
        output_type=Types.TUPLE([Types.STRING(), Types.STRING()]),
    )
    .uid("technical-indicators-5m")
)

# sink indicators_stream_5m to postgresql database table

indicators_stream_5m.add_sink(
    JdbcSink.sink(
        sql=query_5m,
        type_info=Types.TUPLE([Types.STRING(), Types.STRING()]),
        jdbc_connection_options=jdbc_connection_options,
        jdbc_execution_options=jdbc_execution_options,
    )
)

env.execute()

here is the TechnicalProcessFunction:

class TechnicalIndicatorProcessFunction(
    ProcessWindowFunction[tuple, tuple, str, TimeWindow]
):
    def open(self, runtime_context: RuntimeContext):
        return super().open(runtime_context)

    def process(
        self,
        key: str,
        context: ProcessWindowFunction.Context[TimeWindow],
        elements: Iterable,
    ) -> Iterable[Tuple[str, str]]:
        recent_record: Types.ROW_NAMED() = elements[-1]
    
        return [
            (
                recent_record["symbol"],
                "BINANCE",
            )
        ]
    
    def clear(self, context: ProcessWindowFunction.Context) -> None:
        return super().clear(context)
    
    def close(self):
        return super().close()

I am reading data from a redpanda topic and after some processing i wanted to save that data to a table. I have implemented a solution it works fine when i print the stream but its not working when i am trying to sink using JDBCSink

0 Answers0