Cloud Pubsub Expiration Deadline with streaming pull subcription

Hi,

I am working on a cloud pub sub process and i face some ack expiration deadlines. Here are the settings I have:

The pull subscription is defined with terraform with following properties:

  • enable_exactly_once_delivery = true

  • ack_deadline_seconds: 600

I subscribe to the subscription in Python using standard library with a streaming pull method. i am following the same code as defined here:

https://cloud.google.com/pubsub/docs/pull#streamingpull_and_high-level_client_library_code_samples
with specific properties:

executor = futures.ThreadPoolExecutor(max_workers=25)
scheduler = sub_scheduler.ThreadScheduler(executor)

flow_control = pubsub_v1.types.FlowControl(
    max_messages=25,
    min_duration_per_lease_extension=600,
)

streaming_pull_future = subscriber.subscribe(
    subscription_path,
    callback=callback,
    scheduler=scheduler,
    flow_control=flow_control,
    await_callbacks_on_shutdown=True,
)
print(f"Listening for messages on {subscription_path}..")

with subscriber:
    try:
        streaming_pull_future.result(timeout=None)
    except TimeoutError:
        streaming_pull_future.cancel()
        streaming_pull_future.result()

Every message is being processed within 5 minutes, lower than the expiration deadline defined in terraform.
I precise that the treatment of messages is I/O bound and that only 40% of my CPU is used.

Playing with different settings I am still facing some ACK deadline expiration which leads to duplicates.

I tried a lot of different settings without any changes, and still some expiration.

  • I modified min_duration_per_lease_extension and max_duration_per_lease_extension
  • i tried to call message.modify_ack_dealine(seconds=600) before treating the message
  • I lowered the max message in the flow control keeping the same number of threads

Here is my callback function just in case it might affect the ack.

def callback(message: sub_message.Message) -> None:
    """
    Call back function when receiving a message from the subscriber.

    Args:
        message (sub_message.Message): a message containing data.
    """
    print("Received one message.")
    t1 = time.time()
    try:
        treat_message(message.data)
        ack_or_nack_future = message.ack_with_response()
    except Exception as e:
        print("An error occurred while processing message: %s", e)
        ack_or_nack_future = message.nack_with_response()

    try:
        ack_or_nack_future.result(timeout=10.0)
        print(f"Ack for message {message.message_id} successful.")
    except sub_exceptions.AcknowledgeError as e:
        print(f"Ack for message {message.message_id} failed with error: {e.error_code}")
    print("Message treated in", time.time() - t1, "seconds.")

I spent time on the documentation: from what i understood, some (like python) libraries automatically call in the background modify_ack_deadline to extend the deadline and that the new deadline should be included between the min_duration_per_lease_extension and the max_duration_per_lease_extension from the time the moment this method is called, is that right?

I also see in the console that defining a subscription with acknowledgment deadline is not used when using client libraries (such as python)? “When using the high-level client libraries, the lease management configuration is used in place of this acknowledge deadline.”
Does it means setting the acknowledgement deadline can be set to 10, it won’t affect the behavior of the process?
Considering this i don’t understand why i got expiration deadlines that occurs.

Do you have any idea?

Thank you all

It seems like you’re encountering a few key challenges:

  • ACK Deadline Expiration: Despite your message processing being well within the 5-minute limit, you’re still seeing ACK deadlines expire. This is causing unwanted message redelivery and duplicates.

  • Lease Management vs. ACK Deadline: You’re right; the Python client library’s lease management feature overrides the ACK deadline you set in Terraform. This means your ack_deadline_seconds = 600 isn’t directly controlling the expiration.

  • Automatic Extensions: The client library should be automatically extending the ACK deadline for messages under lease. However, something isn’t working as expected.

Here are a few suggestions and potential areas to investigate to resolve the acknowledgment deadline expiration issues you’re facing:

  1. Increase Logging:

    Modify your callback function to log more details. Include:

    • Start time of message processing
    • End time of message processing
    • ACK/NACK status (success/failure)
    • Timestamps of any lease extension attempts (if you can capture them)
  2. Monitor Pub/Sub Metrics:

    In the Cloud Monitoring console, closely watch the following metrics for your subscription:

    • pubsub.googleapis.com/subscription/ack_message_count
    • pubsub.googleapis.com/subscription/ack_deadline_exceeded_count
    • pubsub.googleapis.com/subscription/oldest_unacked_message_age

    Look for patterns:

    • Are ACK deadline exceeded counts correlated with specific times or message volumes?
    • Is the oldest unacked message age consistently high?
  3. Verify Library Behavior:

    • Double-check that you’re using the latest version of the Google Cloud Pub/Sub Python library.
    • Review the library’s source code (or documentation) to understand exactly how it handles lease extensions in the presence of exactly_once_delivery = true.

Potential Causes and Solutions

Here are some possible explanations for the expiration, along with fixes:

  1. Flow Control:

    • Your max_messages (25) is relatively high compared to your max_workers (25). This might lead to some messages being held in the client library buffer longer than expected. Try reducing max_messages to a lower value (e.g., 10) and see if it improves.
  2. Long-Running Tasks in Callback:

    • Even though your average processing time is under 5 minutes, do you have any occasional long-running tasks within the callback (network requests, database operations, etc.)? These could delay ACKs and trigger expirations. Consider moving long-running tasks to a separate thread or process.
  3. Exactly-Once Delivery Behavior:

    • The exactly_once_delivery flag can introduce some nuances in lease management. If the library is aggressively extending leases to ensure exactly-once semantics, it might inadvertently extend them beyond your intended deadline. Experiment with different values for min_duration_per_lease_extension and max_duration_per_lease_extension or consider disabling exactly_once_delivery if it’s not strictly necessary.
  4. Resource Limitations:

    • If your message processing is heavily I/O bound (as you mentioned), ensure your subscriber has sufficient resources (CPU, memory, network) to handle the load.

Important Considerations

  • ACK Deadline vs. Lease Extension: Remember that the ACK deadline in your Terraform configuration won’t be strictly enforced. It’s more of a fallback in case the lease management fails. Focus on configuring lease management properly.

  • Error Handling: Ensure you have robust error handling in your callback. Any unhandled exceptions can prevent ACKs from being sent, leading to expirations.