Facing some issues streaming to BQ using Python Storage Write API

Hello,

I have a streaming application for which I have developed a pipeline which automatically creates the datasets and tables required (+300 datasets each with 5 tables) and streams to them as data arrives.

I am using a GKE Deployment with 4 pods with each of the pods accessing the default stream for each table through the below method:

request_template = types.AppendRowsRequest()
        request_template.write_stream = f"projects/{ct.BigQuery.PROJECT}/datasets/{dataset}/tables/{table}/_default"
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
ct.BigQuery.TABLEINDEX[table]['proto'].DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor

proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data

append_rows_stream = writer.AppendRowsStream(self.write_client, request_template)

return append_rows_stream

As of right now I am using append_rows_stream send method to send indivual rows one at a time (will switch to batching as recommended by the docs best practices in the future) however I am facing some issues. The main one is that I do not manage to get all pods to write to Big Query and instead it’s usually only one or two at max writing concurrently through the streams.

I have also noticed the following error in the logs although I don’t know if it’s too relevant since I should be reopening the stream at the time of writing if it detects it has closed:

DEBUG:google.api_core.bidi:Thread-ConsumeBidirectionalStream caught error 409 Closing the stream because it has been inactive for 600 seconds. Entity: projects/...../_default
2024-05-13 17:18:30.855 βˆ…βˆ…βˆ…
Traceback (most recent call last):   File "/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers.py", line 116, in __next__     return next(self._wrapped)   File "/usr/local/lib/python3.10/site-packages/grpc/_channel.py", line 543, in __next__     return self._next()   File "/usr/local/lib/python3.10/site-packages/grpc/_channel.py", line 969, in _next     raise self grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
2024-05-13 17:18:30.855 βˆ…βˆ…βˆ…
	status = StatusCode.ABORTED
2024-05-13 17:18:30.855 βˆ…βˆ…βˆ…
	details = "Closing the stream because it has been inactive for 600 seconds. Entity: projects/...../_default

As far as I am concerned, using writer.AppendRowsStream should be the way to go since I have a continous flow of data and the default stream should be the correct choice, but if I should instead use client.append_rows() or there is any fault in my reasoning please do let me know.

Thanks

Edit: As additional context my max throughput per second is around half a Gb and the tests I have ran have been with only 50 datasets.

4 Likes