I’m been trying to launch a dataflow job with flex templates in python sdk. The job starts and then fails with the error ModuleNotFoundError: No module named ‘src’.
What am I missing? I don’t want to move the src.processors code to the main python file (e_commerce_batch.py) because that would make that file less readable.
The error message sales_target_parse is not defined indicates that the Dataflow runner cannot find the sales_target_parse function. This is because the src/processors/functions.py module is not imported in the e_commerce_batch.py file.
To fix this error, you need to import the src/processors/functions.py module in the e_commerce_batch.py file. You can do this by adding the following line to the top of the e_commerce_batch.py file:
from src.processors.functions import sales_target_parse
Once you have imported the src/processors/functions.py module, you will be able to access the sales_target_parse function in the e_commerce_batch.py file.
Here is an example of how to import the src/processors/functions.py module in the e_commerce_batch.py file:
from src.processors.functions import sales_target_parse
def main():
# ...
sales_target_parse(data)
if __name__ == '__main__':
main()
Once you have made this change, you should be able to run your Dataflow job without errors.
It is possible that there is something missing in your Dockerfile that is preventing Python from finding the sales_target_parse function.
Here are a few things to check:
Include the src Directory: Make sure that the src directory is included in the COPY statement in your Dockerfile.
Function Definition: Ensure that the sales_target_parse function is defined in a Python file that is included in the COPY statement. Also, make sure that the sales_target_parse function is not defined under an if __name__ == "__main__": block or that it’s not being hidden by any conditional statements, which would prevent it from being imported.
Exporting the Function: In Python, functions are accessible by default unless they are hidden by conditional statements. Ensure that the function is accessible and not restricted by any conditions.
Here is an example of a Dockerfile that you can use to build a Python image that includes the sales_target_parse function:
FROM python:3.10-slim
COPY src /src
WORKDIR /src
RUN pip install -r requirements.txt
CMD ["python", "my_script.py"]
Note: The CMD directive specifies the default command to run when the container starts. If your intention is to run a Dataflow job, you might need a different command.
If you are still having problems, you can try adding the following line to your Dockerfile:
ENV PYTHONPATH=/src:$PYTHONPATH
This ensures that when Python tries to import a module, it also looks in the /src directory, which is where your custom modules are located.
Once you have updated your Dockerfile, you can rebuild your image and run it again.
FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
ARG WORKDIR=/template
RUN mkdir -p ${WORKDIR}
RUN mkdir -p ${WORKDIR}/src
RUN mkdir -p ${WORKDIR}/src/processors
WORKDIR ${WORKDIR}
ARG PYTHON_PY_FILE=e_commerce_batch.py
COPY src /src
COPY . .
ENV PYTHONPATH ${WORKDIR}
ENV PYTHONPATH=/src:$PYTHONPATH
ENV PYTHONPATH=/src/processors:$PYTHONPATH
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/${PYTHON_PY_FILE}"
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py"
# We could get rid of installing libffi-dev and git, or we could leave them.
RUN apt-get update \
# Upgrade pip and install the requirements.
&& pip install --upgrade pip \
&& pip install --no-cache-dir --upgrade pip \
&& pip download --dest /tmp/dataflow-requirements-cache -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE
# Since we already downloaded all the dependencies, there's no need to rebuild everything.
ENV PIP_NO_DEPS=True
ENTRYPOINT ["/opt/google/dataflow/python_template_launcher"]
I’m glad to hear that you’ve made some progress. It sounds like you’re almost there!
The error message sales_target_parse is not defined still indicates that the Dataflow runner cannot find the sales_target_parse function. This could be due to a few reasons:
The src/processors/functions.py module might not be in the PYTHONPATH environment variable.
There could be an issue with how the sales_target_parse function is being imported or used.
To troubleshoot this issue:
Check the PYTHONPATH: Ensure that the PYTHONPATH in your Dockerfile or the environment configuration of your Dataflow job includes the src/processors/ directory.
Exporting Functions: If you’re using wildcard imports (from module import *), ensure that the src/processors/functions.py file has the following line:
__all__ = ['sales_target_parse']
This ensures that the sales_target_parse function is available for wildcard imports. However, if you’re importing the function directly, this step might not be necessary.
Once you’ve checked these points, try running your Dataflow job again.
Additional Tips:
Consider running the Dataflow job with a single worker for debugging purposes. This can help identify if specific workers are having trouble.
Review the Dataflow job logs for any other errors or warnings that might provide more insights.
That’s fantastic news! I’m glad you found the solution.
The save_main_session option is indeed crucial when using Apache Beam (and by extension, Dataflow) with Python. It ensures that the main session’s global context is pickled and made available to the workers, allowing them to access global variables, modules, and functions defined in the main module. Without this, certain dependencies and modules might not be available to the workers, leading to the kind of errors you were seeing.
Thank you for sharing the solution and the reference link. It will be beneficial for others who might encounter a similar issue