Dataflow cannot read messages containing pubsub's ordering key.

I want to copy mysql data in real time with bigquery.

The structure is as follows.
mysql → flink-cdc → cloud pubsub → dataflow → bigquery

To transfer Mysql Binlog to pubsub, flink-cdc uses the PubsubSink library. (https://github.com/GoogleCloudPlatform/pubsub/blob/master/flink-connector/docs/content/docs/connectors/datastream/pubsub .md)

And in order to process binlog messages in order, I also set the Ordering Key based on the pk of the MySQL table.

Next, I created a simple dataflow app that reads messages from pubsub and outputs logs. However, when a row (Mysql table row) is created, the log message is output normally to the dataflow app, but when data with the same PK is updated, it is not output from dataflow.

And when I restart the dataflow app, messages that have already been processed will be displayed on the console once again.

Why is the second message not processed when the pk is the same (same ordering key)?
And why does it read and process the same message when dataflow restarts?

// pubsub sink code
// tuple2.f0 = pk, tuple2.f1 = binlog message
return PubSubSink.builder<Tuple2<String, String>>()
            .setSerializationSchema(PubSubOrderingMessageSerializer())
            .setProjectName("project-a")
            .setTopicName("topic-a")
            .setEnableMessageOrdering(true)
            .setCredentials(javaClass.getResourceAsStream("/service-account.json")
                .use { inputStream ->
                    ServiceAccountCredentials.fromStream(inputStream)
                })
            .build()
//PubsubMessageSerializer
class PubSubOrderingMessageSerializer : PubSubSerializationSchema<Tuple2<String, String>>{
    override fun open(ctx: SerializationSchema.InitializationContext) {

    }

    override fun serialize(element: Tuple2<String, String>): PubsubMessage {
        return PubsubMessage
            .newBuilder()
            .setOrderingKey(element.f0)  //set ordering key
            .setData(ByteString.copyFromUtf8(element.f1))
            .build()
    }
}

dataflow code

val credentials = javaClass.getResourceAsStream("service-account.json").use { ServiceAccountCredentials.fromStream(it) }

        val options = PipelineOptionsFactory.`as`(DirectOptions::class.java).apply {
             targetParallelism = 10
        }

        val gcpOptions = options.`as`(GcpOptions::class.java).apply {
            gcpCredential = credentials
        }

        val pipeline = Pipeline.create(options)

        pipeline
            .apply("ReadFromPubSub",
                PubsubIO.readMessagesWithAttributesAndMessageIdAndOrderingKey()

                    .fromSubscription("projects/project-a/subscriptions/test-sub")
            )

            .apply("WriteToConsole", ParDo.of(object : DoFn<PubsubMessage, Void>() {
                val LOG = LoggerFactory.getLogger(PubSubToBigQuerySample::class.java)
                @ProcessElement
                fun processElement(c: ProcessContext) {

                    LOG.info("${Thread.currentThread().id} - ${c.element().messageId} - ${c.element().orderingKey} - ${c.element().payload.decodeToString()}")

                }
            }))