Using Google Bigquery Storage Write API with high concurrency

I have a multi-tenant application with different Bigquery tables for each tenant (about 30 for each). We use bigquery for creating reports of events, changes etc. and we’re currently using legacy streaming insert in Python (Django) but we are hitting request quota.

We are considering using the Storage Write API because it has better performance. I’m planning to use a flow like this:

# Most of the time rows will have a single row
def insert_rows(dataset_id, table_id, rows):
   ...
   write_client = bigquery_storage_v1.BigQueryWriteClient()
   ...
   request = types.AppendRowsRequest()
   proto_data = types.AppendRowsRequest.ProtoData()
   proto_data.rows = proto_rows
   proto_data.writer_schema = proto_schema # Use 
   request.proto_rows = proto_data
   request.write_stream = stream_name # Default Stream

   requests = [request]
   write_client.append_rows(iter(requests))

I believe I should send writer_schema and proto_rows each time because the schema can change depending on the table.

I’m worried about this limit.

Concurrent connections 1,000 in a region; 10,000 in a multi-region

I seems like if many instances use append_rows at the same time we might hit this limit. Is this true?

Also is there a better way to insert this kind of data to bigquery (Example: A row record is inserted when a user is logged in, one record per django request generally)?

Thanks

1 Like

The BigQuery Storage Write API is designed for efficient, high-throughput data ingestion into BigQuery. When dealing with high concurrency, it’s essential to optimize the use of the API to prevent hitting quotas and maintain performance.

Connection Management

Effective connection management is key to optimizing the use of the BigQuery Storage Write API:

  • Client Instance Reuse: Instead of traditional connection pooling, focus on reusing BigQueryWriteClient instances across multiple requests. This approach minimizes the overhead associated with client instantiation and authentication, akin to the benefits of connection pooling in database systems.

  • Batching Requests: Group multiple rows into a single request wherever possible. This reduces the number of API calls, conserves resources, and improves throughput. It’s a best practice to accumulate data and send it in larger batches rather than individual rows.

Data Ingestion Optimization

Further strategies to optimize data ingestion include:

  • Schema Management: If your data schema does not change frequently, you can send the writer_schema once during the initial setup of the stream. For applications with evolving schemas, implement a versioning system or create new streams to manage schema updates without disrupting data ingestion.

Monitoring and Adjustments

  • Metrics Monitoring: Keep a close eye on metrics such as the number of active streams, append request rates, and error rates. These metrics will inform you when to scale your operations up or down and help maintain an efficient ingestion process.

Dedicated Streaming Services

  • Leveraging Google Pub/Sub and Dataflow: For complex scenarios that require data transformation or enrichment before ingestion, services like Google Pub/Sub and Dataflow can be invaluable. They offer features like message durability and the ability to replay messages, simplifying integration with other Google Cloud Platform services.

Legacy Streaming Insert Method

  • Evaluating Legacy vs. Storage Write API: The legacy streaming insert method may be suitable for applications with predominantly small, single-row inserts due to its simplicity and low latency. However, it’s important to consider the cost implications at scale. The Storage Write API, while more complex, is generally more cost-effective for larger-scale, high-throughput scenarios.
1 Like

Thanks for your thorough response!

That’s indeed a good idea. I will apply that but i think this will no help with the concurrent connection limit.

I’d really want to do that but since I’m inserting single rows when an event happens I don’t see how I can batch then. Maybe putting a service in between can help but I feel like that would be needlessly complex.

My schema changes very rarely but the table that I’m inserting keeps changing. For example when a user is logged in, I insert a row in the user table but if something is tagged it’s saved in the tags table. Is there a way to send writer_schema once with this limitation?

I’m doing exactly that; small, single-row inserts with high frequency. However, if Storage Write API will still be better (despite the complexity) for this case, I want to adopt it.

Thanks again

1 Like

Addressing each of your points for further clarification:

Client Instance Reuse : Reusing BigQueryWriteClient instances will indeed not directly reduce the number of concurrent connections since each write operation could potentially create a new connection. However, it does reduce the overhead of client creation and authentication. To manage the concurrent connection limit, you might want to consider implementing a queueing system where writes are queued and managed by a limited number of worker processes or threads, each with its own client instance. This way, you can control the maximum number of concurrent connections.

Batching Requests: For single-row inserts triggered by events, batching might seem less intuitive. However, you can still implement a delayed batching system. Here’s how it could work:

  • Asynchronous Queue: When an event occurs, instead of writing directly to BigQuery, you push the event data to an asynchronous queue.
  • Batching Service: A separate service consumes events from the queue and batches them. This service waits either for the batch to reach a certain size or a specified time interval to elapse before writing to BigQuery.
  • Immediate Feedback: If your application requires immediate confirmation of data being inserted, this can be simulated by acknowledging the addition to the queue.

This approach adds some complexity but can significantly reduce the number of write operations and help manage the concurrent connection limit.

Schema Management: If you’re writing to different tables, you can still manage the writer_schema efficiently by creating a write stream for each table. Since your schema changes rarely, you can set up the schema once when you initialize the stream for a particular table. For each subsequent write to that table, you can reuse the stream without sending the schema again. This approach requires maintaining a mapping of streams to tables, which can be dynamically updated if a new table is added or the schema changes.

Evaluating Legacy vs. Storage Write API: The legacy streaming insert method is optimized for small, single-row inserts, but it has its own limits and costs, especially when scaled up. The Storage Write API, despite its complexity, is designed for higher throughput and can be more cost-effective at scale. It also offers more features, such as exactly-once data insertion semantics, which can be beneficial for ensuring data consistency.

Given your scenario of high-frequency, small, single-row inserts, the decision to move to the Storage Write API should be based on:

  • Cost Analysis: Compare the costs between the legacy method and the Storage Write API at your current and projected scale.
  • Performance Requirements: If the legacy method is meeting your performance needs without hitting quotas, it might be sufficient. However, if you’re facing limitations, the Storage Write API could provide the necessary performance improvements.
  • Complexity vs. Benefit: Evaluate whether the additional complexity of implementing the Storage Write API is justified by the benefits it provides in terms of performance, features, and cost.

While the Storage Write API is generally better for high-throughput scenarios, for your specific use case of high-frequency, small, single-row inserts, a thorough evaluation of costs, performance, and complexity is necessary to make an informed decision. Implementing a queueing and batching mechanism, even with the added complexity, could help you leverage the Storage Write API more effectively and manage the concurrent connection limit.

2 Likes

Thank you for the clarification

If I implement a delayed batching system, I think will need to store queued data for each table and I have more than 1000 tables across all tenants (there are only about 30 schemas though). I’m wondering if it would be as optimal if I implement that system in Go or Java and enable multiplexing without batching the incoming data.