Dataflow flex tempalte Spanner to BQ

Hi, I am trying to build dataflow flex tempalte using Apache Beam to sync data from Spanner to Bigquery.

I can`t use Spanner to BigQuery template  |  Cloud Dataflow  |  Google Cloud because it doesn’t support Read From Timestamp, but Beam does.

I followed all the guides in the official documentation and in getting started python repo, but didnt succeed.

Currently, I am facing a problem when trying to perform a function from the GCP module in Dataflow.

Docker file:

FROM gcr.io/dataflow-templates-base/python3-template-launcher-base

ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="/template/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/template/main.py"

COPY . /template

RUN apt-get update \
    # Install any apt packages if required by your template pipeline.
    && apt-get install -y libffi-dev git \
    && rm -rf /var/lib/apt/lists/* \
    # Upgrade pip and install the requirements.
    && pip install --no-cache-dir --upgrade pip \
    # Install dependencies from requirements file in the launch environment.
    && pip install --no-cache-dir -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE \
    # When FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE  option is used,
    # then during Template launch Beam downloads dependencies
    # into a local requirements cache folder and stages the cache to workers.
    # To speed up Flex Template launch, pre-download the requirements cache
    # when creating the Template.
    && pip download --no-cache-dir --dest /tmp/dataflow-requirements-cache -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE

# Set this if using Beam 2.37.0 or earlier SDK to speed up job submission.
ENV PIP_NO_DEPS=True

ENTRYPOINT ["/opt/google/dataflow/python_template_launcher"]

main.py

import logging

import apache_beam as beam
from apache_beam import coders
from apache_beam.io.gcp.spanner import ReadFromSpanner
from typing import NamedTuple
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions

LOG = logging.getLogger(__name__)
LOG.setLevel(logging.INFO)


def run():
    # Parse the pipeline options passed into the application.
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option that specfies the Cloud Storage bucket.
        def _add_argparse_args(cls, parser):
            parser.add_argument("--output", required=True)

    # This is the corrected line that parses command-line arguments
    options = MyOptions()

    class ExampleRow(NamedTuple):
        emr_member_code_id: str

    coders.registry.register_coder(ExampleRow, coders.RowCoder)

    with beam.Pipeline(options=options) as p:
        (
            p
            | "ReadFromSpanner" >> ReadFromSpanner(
                instance_id='instance_id',
                database_id='database_id',
                project_id='project_id',
                row_type=ExampleRow,
                sql='select to_base64(emr_member_code_id) as emr_member_code_id from emr_codes',
            )
            | "ToDict" >> beam.Map(lambda r: r._asdict())

            | "WriteToBQ" >> WriteToBigQuery(
                table="pp-import-staging.OCEAN_11039.emr_codes",
                schema="emr_member_code_id:BYTES",
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                custom_gcs_temp_location='gs://staging-dataflow-tmp/tmp',
            )
        )


if __name__ == '__main__':
    run()

requirements.txt

apache-beam[gcp]==2.66.0

I am building Docker file and json file using commands like

gcloud dataflow flex-template build gs://sxp-ocean-test/dataflow/mytemplate_2.json \
 --image-gcr-path "us-east4-docker.pkg.dev/pp-ocean-staging/sxope-emr-files-loader/mytemplate_2:latest" \
 --sdk-language "PYTHON" \                                
 --flex-template-base-image "PYTHON3" \
 --metadata-file "metadata.json" \                        
 --py-path "." \                                             
 --env "FLEX_TEMPLATE_PYTHON_PY_FILE=main.py" \
 --env "FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE=requirements.txt" \
 --gcs-log-dir "gs://sxp-ocean-test/logs/"

And run

gcloud dataflow flex-template run mytemplate-$(date +%Y%m%d-%H%M%S) \                     
 --template-file-gcs-location=gs://sxp-ocean-test/dataflow/mytemplate_2.json \                     
 --parameters=output="gs://sxp-ocean-test/output/" \
 --region=us-east4 \                   
 --temp-location=gs://sxp-ocean-staging-dataflow-tmp/tmp \
 --staging-location=gs://sxp-ocean-staging/dataflow/staging \
 --num-workers=1  

And get an error:

2025-08-25 14:32:46.400 GET

Error: Template launch failed: exit status 1

2025-08-25 14:35:16.912 GET

Error occurred in the launcher container: Template launch failed. See console logs.

But no logs are presented in the console or in GCS temp locations

I can substitute the beam pipeline code with something that doesn’t use GCP module from Beam, like

with beam.Pipeline(options=options) as p:
        (
                p
                | 'CreateDemo' >> beam.Create(['A', 'B', 'C'])
                | 'WriteToText' >> beam.io.WriteToText(options.output)
        )

Dataflow will be launched successfully and will be completed without any errors.

Also, i can successfully deploy code locally to Dataflow using


 options = PipelineOptions([
        '--runner=DataflowRunner',
        ...
])

with beam.Pipeline(options=options) as p:
        (
            p
            | "ReadFromSpanner" >> ReadFromSpanner(
                instance_id='instance_id',
                database_id='database_id',
                project_id='project_id',
                row_type=ExampleRow,
                sql='select to_base64(emr_member_code_id) as emr_member_code_id from emr_codes',
            )
            | "ToDict" >> beam.Map(lambda r: r._asdict())

            | "WriteToBQ" >> WriteToBigQuery(
                table="pp-import-staging.OCEAN_11039.emr_codes",
                schema="emr_member_code_id:BYTES",
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                custom_gcs_temp_location='gs://sxp-ocean-staging-dataflow-tmp/tmp',
            )
        )

So as i can see, code works when launched separately, but when i am trying to build docker image/push it to the registry and launch using gcloud, it fails

1 Like

Hi @Andrei_Strenkovskii,

Your pipeline works when launched locally, so the code is mostly correct. So, the problem likely lies with the configuration of the Flex Template and the Docker environment.

Here are the things to check:

  • Docker environment: Ensure that the image is set up correctly with dependencies and authentication in your requirements.txt.
  • Permissions: Create a service account with permissions as described here.
  • Create GCS bucket (and add the just created service account as principal)
  • gcloud auth application-default login in your local shell

I’ve found a similar case as yours, check out the full details here. You might also want to check these documentation to troubleshoot your Dataflow Flex Template issue: Early Startup Issues and Flex Template launcher logs show wrong severity.

Thanks for the quick response.
Links to the documentation would be invaluable. I will take a look

My main problem is that I can substitute part of the pipeline code with Spanner to make it work.

For example, if I remove the ReadFromSpanner method and write a dummy collection to BigQuery instead, the pipeline works fine.

table_spec = "pp-import-staging.OCEAN_11039.test_table"

table_schema = {
    "fields": [
        {"name": "id", "type": "INTEGER", "mode": "REQUIRED"},
        {"name": "name", "type": "STRING", "mode": "REQUIRED"},
        {"name": "age", "type": "INTEGER", "mode": "NULLABLE"},
    ]
}

with beam.Pipeline(options=options) as p:
    (
        p
        | "Start" >> beam.Create([None])  # dummy PCollection
        | "Generate Rows" >> beam.ParDo(GenerateRows())
        | "Write to BQ" >> beam.io.WriteToBigQuery(
            table=table_spec,
            schema=table_schema,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            custom_gcs_temp_location='gs://sxp-ocean-staging-dataflow-tmp/tmp',
        )
    )

From my point of view:

  • Docker image is correct
  • I have all the necessary access.

But it just doesn’t work with Spanner.
I also tried adding JAVA to Docker image, but it didn’t help, because it says in the BEAM doc

There are several ways to setup cross-language Spanner transforms.

  • Option 1: use the default expansion service
  • Option 2: specify a custom expansion service
1 Like

@ms4446, Hi
Sorry to call you out of the blue
Still looking for an example of a Flex template with Spanner read or write operations.
Have you ever seen one?

Accidentally found StackOverflow topic with a similar problem