Hi, I am trying to build dataflow flex tempalte using Apache Beam to sync data from Spanner to Bigquery.
I can`t use Spanner to BigQuery template | Cloud Dataflow | Google Cloud because it doesn’t support Read From Timestamp, but Beam does.
I followed all the guides in the official documentation and in getting started python repo, but didnt succeed.
Currently, I am facing a problem when trying to perform a function from the GCP module in Dataflow.
Docker file:
FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="/template/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/template/main.py"
COPY . /template
RUN apt-get update \
# Install any apt packages if required by your template pipeline.
&& apt-get install -y libffi-dev git \
&& rm -rf /var/lib/apt/lists/* \
# Upgrade pip and install the requirements.
&& pip install --no-cache-dir --upgrade pip \
# Install dependencies from requirements file in the launch environment.
&& pip install --no-cache-dir -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE \
# When FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE option is used,
# then during Template launch Beam downloads dependencies
# into a local requirements cache folder and stages the cache to workers.
# To speed up Flex Template launch, pre-download the requirements cache
# when creating the Template.
&& pip download --no-cache-dir --dest /tmp/dataflow-requirements-cache -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE
# Set this if using Beam 2.37.0 or earlier SDK to speed up job submission.
ENV PIP_NO_DEPS=True
ENTRYPOINT ["/opt/google/dataflow/python_template_launcher"]
main.py
import logging
import apache_beam as beam
from apache_beam import coders
from apache_beam.io.gcp.spanner import ReadFromSpanner
from typing import NamedTuple
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
LOG = logging.getLogger(__name__)
LOG.setLevel(logging.INFO)
def run():
# Parse the pipeline options passed into the application.
class MyOptions(PipelineOptions):
@classmethod
# Define a custom pipeline option that specfies the Cloud Storage bucket.
def _add_argparse_args(cls, parser):
parser.add_argument("--output", required=True)
# This is the corrected line that parses command-line arguments
options = MyOptions()
class ExampleRow(NamedTuple):
emr_member_code_id: str
coders.registry.register_coder(ExampleRow, coders.RowCoder)
with beam.Pipeline(options=options) as p:
(
p
| "ReadFromSpanner" >> ReadFromSpanner(
instance_id='instance_id',
database_id='database_id',
project_id='project_id',
row_type=ExampleRow,
sql='select to_base64(emr_member_code_id) as emr_member_code_id from emr_codes',
)
| "ToDict" >> beam.Map(lambda r: r._asdict())
| "WriteToBQ" >> WriteToBigQuery(
table="pp-import-staging.OCEAN_11039.emr_codes",
schema="emr_member_code_id:BYTES",
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
custom_gcs_temp_location='gs://staging-dataflow-tmp/tmp',
)
)
if __name__ == '__main__':
run()
requirements.txt
apache-beam[gcp]==2.66.0
I am building Docker file and json file using commands like
gcloud dataflow flex-template build gs://sxp-ocean-test/dataflow/mytemplate_2.json \
--image-gcr-path "us-east4-docker.pkg.dev/pp-ocean-staging/sxope-emr-files-loader/mytemplate_2:latest" \
--sdk-language "PYTHON" \
--flex-template-base-image "PYTHON3" \
--metadata-file "metadata.json" \
--py-path "." \
--env "FLEX_TEMPLATE_PYTHON_PY_FILE=main.py" \
--env "FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE=requirements.txt" \
--gcs-log-dir "gs://sxp-ocean-test/logs/"
And run
gcloud dataflow flex-template run mytemplate-$(date +%Y%m%d-%H%M%S) \
--template-file-gcs-location=gs://sxp-ocean-test/dataflow/mytemplate_2.json \
--parameters=output="gs://sxp-ocean-test/output/" \
--region=us-east4 \
--temp-location=gs://sxp-ocean-staging-dataflow-tmp/tmp \
--staging-location=gs://sxp-ocean-staging/dataflow/staging \
--num-workers=1
And get an error:
2025-08-25 14:32:46.400 GET
Error: Template launch failed: exit status 1
2025-08-25 14:35:16.912 GET
Error occurred in the launcher container: Template launch failed. See console logs.
But no logs are presented in the console or in GCS temp locations
I can substitute the beam pipeline code with something that doesn’t use GCP module from Beam, like
with beam.Pipeline(options=options) as p:
(
p
| 'CreateDemo' >> beam.Create(['A', 'B', 'C'])
| 'WriteToText' >> beam.io.WriteToText(options.output)
)
Dataflow will be launched successfully and will be completed without any errors.
Also, i can successfully deploy code locally to Dataflow using
options = PipelineOptions([
'--runner=DataflowRunner',
...
])
with beam.Pipeline(options=options) as p:
(
p
| "ReadFromSpanner" >> ReadFromSpanner(
instance_id='instance_id',
database_id='database_id',
project_id='project_id',
row_type=ExampleRow,
sql='select to_base64(emr_member_code_id) as emr_member_code_id from emr_codes',
)
| "ToDict" >> beam.Map(lambda r: r._asdict())
| "WriteToBQ" >> WriteToBigQuery(
table="pp-import-staging.OCEAN_11039.emr_codes",
schema="emr_member_code_id:BYTES",
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
custom_gcs_temp_location='gs://sxp-ocean-staging-dataflow-tmp/tmp',
)
)
So as i can see, code works when launched separately, but when i am trying to build docker image/push it to the registry and launch using gcloud, it fails