Hello,
I have a streaming application for which I have developed a pipeline which automatically creates the datasets and tables required (+300 datasets each with 5 tables) and streams to them as data arrives.
I am using a GKE Deployment with 4 pods with each of the pods accessing the default stream for each table through the below method:
request_template = types.AppendRowsRequest()
request_template.write_stream = f"projects/{ct.BigQuery.PROJECT}/datasets/{dataset}/tables/{table}/_default"
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
ct.BigQuery.TABLEINDEX[table]['proto'].DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
append_rows_stream = writer.AppendRowsStream(self.write_client, request_template)
return append_rows_stream
As of right now I am using append_rows_stream send method to send indivual rows one at a time (will switch to batching as recommended by the docs best practices in the future) however I am facing some issues. The main one is that I do not manage to get all pods to write to Big Query and instead itβs usually only one or two at max writing concurrently through the streams.
I have also noticed the following error in the logs although I donβt know if itβs too relevant since I should be reopening the stream at the time of writing if it detects it has closed:
DEBUG:google.api_core.bidi:Thread-ConsumeBidirectionalStream caught error 409 Closing the stream because it has been inactive for 600 seconds. Entity: projects/...../_default
2024-05-13 17:18:30.855 β
β
β
Traceback (most recent call last): File "/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers.py", line 116, in __next__ return next(self._wrapped) File "/usr/local/lib/python3.10/site-packages/grpc/_channel.py", line 543, in __next__ return self._next() File "/usr/local/lib/python3.10/site-packages/grpc/_channel.py", line 969, in _next raise self grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
2024-05-13 17:18:30.855 β
β
β
status = StatusCode.ABORTED
2024-05-13 17:18:30.855 β
β
β
details = "Closing the stream because it has been inactive for 600 seconds. Entity: projects/...../_default
As far as I am concerned, using writer.AppendRowsStream should be the way to go since I have a continous flow of data and the default stream should be the correct choice, but if I should instead use client.append_rows() or there is any fault in my reasoning please do let me know.
Thanks
Edit: As additional context my max throughput per second is around half a Gb and the tests I have ran have been with only 50 datasets.