Hi,
I am working on a cloud pub sub process and i face some ack expiration deadlines. Here are the settings I have:
The pull subscription is defined with terraform with following properties:
-
enable_exactly_once_delivery = true
-
ack_deadline_seconds: 600
I subscribe to the subscription in Python using standard library with a streaming pull method. i am following the same code as defined here:
https://cloud.google.com/pubsub/docs/pull#streamingpull_and_high-level_client_library_code_samples
with specific properties:
executor = futures.ThreadPoolExecutor(max_workers=25)
scheduler = sub_scheduler.ThreadScheduler(executor)
flow_control = pubsub_v1.types.FlowControl(
max_messages=25,
min_duration_per_lease_extension=600,
)
streaming_pull_future = subscriber.subscribe(
subscription_path,
callback=callback,
scheduler=scheduler,
flow_control=flow_control,
await_callbacks_on_shutdown=True,
)
print(f"Listening for messages on {subscription_path}..")
with subscriber:
try:
streaming_pull_future.result(timeout=None)
except TimeoutError:
streaming_pull_future.cancel()
streaming_pull_future.result()
Every message is being processed within 5 minutes, lower than the expiration deadline defined in terraform.
I precise that the treatment of messages is I/O bound and that only 40% of my CPU is used.
Playing with different settings I am still facing some ACK deadline expiration which leads to duplicates.
I tried a lot of different settings without any changes, and still some expiration.
- I modified min_duration_per_lease_extension and max_duration_per_lease_extension
- i tried to call message.modify_ack_dealine(seconds=600) before treating the message
- I lowered the max message in the flow control keeping the same number of threads
Here is my callback function just in case it might affect the ack.
def callback(message: sub_message.Message) -> None:
"""
Call back function when receiving a message from the subscriber.
Args:
message (sub_message.Message): a message containing data.
"""
print("Received one message.")
t1 = time.time()
try:
treat_message(message.data)
ack_or_nack_future = message.ack_with_response()
except Exception as e:
print("An error occurred while processing message: %s", e)
ack_or_nack_future = message.nack_with_response()
try:
ack_or_nack_future.result(timeout=10.0)
print(f"Ack for message {message.message_id} successful.")
except sub_exceptions.AcknowledgeError as e:
print(f"Ack for message {message.message_id} failed with error: {e.error_code}")
print("Message treated in", time.time() - t1, "seconds.")
I spent time on the documentation: from what i understood, some (like python) libraries automatically call in the background modify_ack_deadline to extend the deadline and that the new deadline should be included between the min_duration_per_lease_extension and the max_duration_per_lease_extension from the time the moment this method is called, is that right?
I also see in the console that defining a subscription with acknowledgment deadline is not used when using client libraries (such as python)? “When using the high-level client libraries, the lease management configuration is used in place of this acknowledge deadline.”
Does it means setting the acknowledgement deadline can be set to 10, it won’t affect the behavior of the process?
Considering this i don’t understand why i got expiration deadlines that occurs.
Do you have any idea?
Thank you all