Dataflow: ModuleNotFoundError: No module named 'src'

Hi

I’m been trying to launch a dataflow job with flex templates in python sdk. The job starts and then fails with the error ModuleNotFoundError: No module named ‘src’.

I’ll provide some context:


File tree


Dockerfile


setup.py


requirements.txt


metadata.json

e_commerce_batch.py

Then, in cloud shell I run the following:

gcloud dataflow flex-template build gs://${bucket}/e_commerce_batch.json
–image-gcr-path “${region}-docker.pkg.dev/${proyecto}/${artifact_registry_name}/dataflow/e_commerce_batch:latest”
–sdk-language “PYTHON”
–flex-template-base-image “PYTHON3”
–metadata-file “metadata.json”
–py-path “.”
–py-path “src/”
–py-path “src/processors/”
–env “FLEX_TEMPLATE_PYTHON_PY_FILE=e_commerce_batch.py”
–env “FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE=requirements.txt” \

What am I missing? I don’t want to move the src.processors code to the main python file (e_commerce_batch.py) because that would make that file less readable.


Best regards
David Regalado
Web | Linkedin | Cloudskillsboost

1 Like

UPDATE: I forgot to add theFLEX_TEMPLATE_PYTHON_SETUP_FILE when launching. Now I get a new error :sweat_smile:

gcloud dataflow flex-template build gs://${bucket}/e_commerce_batch.json
–image-gcr-path “${region}-docker.pkg.dev/${proyecto}/${artifact_registry_name}/dataflow/e_commerce_batch:latest”
–sdk-language “PYTHON”
–flex-template-base-image “PYTHON3”
–metadata-file “metadata.json”
–py-path “.”
–py-path “src/”
–py-path “src/processors/”
–env “FLEX_TEMPLATE_PYTHON_PY_FILE=e_commerce_batch.py”
–env “FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE=requirements.txt”

–env “FLEX_TEMPLATE_PYTHON_SETUP_FILE=setup.py”

The new error



sales_target_parse is a function declared in src/processors/functions.py. It’s like the main code is not importing the functions in that path.

This is the line 65 in the main file (e_commerce_batch.py):

This is sales_target_parse’s code in src/processors/functions.py


Best regards
David Regalado
Web | Linkedin | Cloudskillsboost

1 Like

The error message sales_target_parse is not defined indicates that the Dataflow runner cannot find the sales_target_parse function. This is because the src/processors/functions.py module is not imported in the e_commerce_batch.py file.

To fix this error, you need to import the src/processors/functions.py module in the e_commerce_batch.py file. You can do this by adding the following line to the top of the e_commerce_batch.py file:

from src.processors.functions import sales_target_parse

Once you have imported the src/processors/functions.py module, you will be able to access the sales_target_parse function in the e_commerce_batch.py file.

Here is an example of how to import the src/processors/functions.py module in the e_commerce_batch.py file:

from src.processors.functions import sales_target_parse

def main():
  # ...

  sales_target_parse(data)

if __name__ == '__main__':
  main()

Once you have made this change, you should be able to run your Dataflow job without errors.

1 Like

Hello and thanks for the fast reply. The original post shows that I’m already importing that function. Nevertheless, I did another run with

from src.processors.functions import sales_target_parse but I got the same error. Could it be something in the Dockerfile I’m missing?


Best regards
David Regalado
Web | Linkedin | Cloudskillsboost

1 Like

It is possible that there is something missing in your Dockerfile that is preventing Python from finding the sales_target_parse function.

Here are a few things to check:

  1. Include the src Directory: Make sure that the src directory is included in the COPY statement in your Dockerfile.

  2. Function Definition: Ensure that the sales_target_parse function is defined in a Python file that is included in the COPY statement. Also, make sure that the sales_target_parse function is not defined under an if __name__ == "__main__": block or that it’s not being hidden by any conditional statements, which would prevent it from being imported.

  3. Exporting the Function: In Python, functions are accessible by default unless they are hidden by conditional statements. Ensure that the function is accessible and not restricted by any conditions.

Here is an example of a Dockerfile that you can use to build a Python image that includes the sales_target_parse function:

FROM python:3.10-slim

COPY src /src

WORKDIR /src

RUN pip install -r requirements.txt

CMD ["python", "my_script.py"]

Note: The CMD directive specifies the default command to run when the container starts. If your intention is to run a Dataflow job, you might need a different command.

If you are still having problems, you can try adding the following line to your Dockerfile:

ENV PYTHONPATH=/src:$PYTHONPATH

This ensures that when Python tries to import a module, it also looks in the /src directory, which is where your custom modules are located.

Once you have updated your Dockerfile, you can rebuild your image and run it again.

1 Like

This is my current Dockerfile:

FROM gcr.io/dataflow-templates-base/python3-template-launcher-base

ARG WORKDIR=/template
RUN mkdir -p ${WORKDIR}
RUN mkdir -p ${WORKDIR}/src
RUN mkdir -p ${WORKDIR}/src/processors
WORKDIR ${WORKDIR}

ARG PYTHON_PY_FILE=e_commerce_batch.py

COPY src /src
COPY . .

ENV PYTHONPATH ${WORKDIR}
ENV PYTHONPATH=/src:$PYTHONPATH
ENV PYTHONPATH=/src/processors:$PYTHONPATH

ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/${PYTHON_PY_FILE}"
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py"

# We could get rid of installing libffi-dev and git, or we could leave them.
RUN apt-get update \
# Upgrade pip and install the requirements.
&& pip install --upgrade pip \
&& pip install --no-cache-dir --upgrade pip \
&& pip download --dest /tmp/dataflow-requirements-cache -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE

# Since we already downloaded all the dependencies, there's no need to rebuild everything.
ENV PIP_NO_DEPS=True

ENTRYPOINT ["/opt/google/dataflow/python_template_launcher"]

I added the

ENV PYTHONPATH=/src:$PYTHONPATH

And in the python code, I added the following:

from src.processors.functions import debug, list_of_orders_parse, sales_target_parse, order_details_parse, valid_rows, valid_columns, valid_columns, merge_datasets, merge_datasets2

Same error.

Best regards
David Regalado
Web | Linkedin | Cloudskillsboost

1 Like

I’m glad to hear that you’ve made some progress. It sounds like you’re almost there!

The error message sales_target_parse is not defined still indicates that the Dataflow runner cannot find the sales_target_parse function. This could be due to a few reasons:

  • The src/processors/functions.py module might not be in the PYTHONPATH environment variable.
  • There could be an issue with how the sales_target_parse function is being imported or used.

To troubleshoot this issue:

  1. Check the PYTHONPATH: Ensure that the PYTHONPATH in your Dockerfile or the environment configuration of your Dataflow job includes the src/processors/ directory.

  2. Exporting Functions: If you’re using wildcard imports (from module import *), ensure that the src/processors/functions.py file has the following line:

    __all__ = ['sales_target_parse']

    This ensures that the sales_target_parse function is available for wildcard imports. However, if you’re importing the function directly, this step might not be necessary.

Once you’ve checked these points, try running your Dataflow job again.

Additional Tips:

  • Consider running the Dataflow job with a single worker for debugging purposes. This can help identify if specific workers are having trouble.
  • Review the Dataflow job logs for any other errors or warnings that might provide more insights.
1 Like

Same error :disappointed_face:


Best regards
David Regalado
Web | Linkedin | Cloudskillsboost

1 Like

EUREKA!

I’ve solved the issue by adding **save_main_session=**True in my python code!

See Pickling and Managing the Main Session for more info.


Best regards
David Regalado
Web | Linkedin | Cloudskillsboost

2 Likes

That’s fantastic news! I’m glad you found the solution.

The save_main_session option is indeed crucial when using Apache Beam (and by extension, Dataflow) with Python. It ensures that the main session’s global context is pickled and made available to the workers, allowing them to access global variables, modules, and functions defined in the main module. Without this, certain dependencies and modules might not be available to the workers, leading to the kind of errors you were seeing.

Thank you for sharing the solution and the reference link. It will be beneficial for others who might encounter a similar issue

1 Like

hey @davidregalado25 can you specify me where to add the **save_main_session=**True in the python code

main.py

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, GoogleCloudOptions
from apache_beam.io.gcp.pubsub import ReadFromPubSub, WriteToPubSub
from transform_messages import TransformMessages
import datetime

def run(argv=None :disappointed_face:
pipeline_options = PipelineOptions(argv)
pipeline_options.view_as(StandardOptions).streaming = True

Add the service account email using GoogleCloudOptions

google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.service_account_email = ‘’

Set the job name

job_name = f"dataflow-job-{datetime.datetime.now().strftime(‘%Y%m%d-%H%M%S’)}"
google_cloud_options.job_name = job_name

Set the save_main_session option directly on the PipelineOptions object

pipeline_options.save_main_session = True

with beam.Pipeline(options=pipeline_options) as p:
messages = (p
| ‘ReadFromPubSub’ >> ReadFromPubSub(topic=‘projects/dev-payment-analytics/topics/mysql.dataflow.PAYMENT’)
| ‘TransformMessages’ >> beam.ParDo(TransformMessages())
| ‘WriteToPubSub’ >> WriteToPubSub(topic=‘projects/dev-payment-analytics/topics/dataflow_bigquery_push’)
)

if name == ‘main’:
run()

its not working for the abovee file

2 Likes

Hi @uday_varma

Did you solved your issue? Your pipeline should look a little bit like this:


Best regards
David Regalado
Web | Linkedin | Cloudskillsboost