I created a basic pipeline run using managed notebooks as well as instances in workbench. But my basic pipeline couldn’t even run with error quoting -
The DAG failed because some tasks failed. The failed tasks are: [concat].; Job (project_id = practice-training, job_id = 7480518563480993792) is failed due to the above error.; Failed to handle the job: {project_number = 385236764312, job_id = 7480518563480993792}.
The error on the node says -
com.google.cloud.ai.platform.common.errors.AiPlatformException: code=RESOURCE_EXHAUSTED, message=The following quota metrics exceed quota limits: aiplatform.googleapis.com/custom_model_training_cpus, cause=null; Failed to create custom job for the task. Task: Project number: 385236764312, Job id: 7480518563480993792, Task id: 6516721854944641024, Task name: concat, Task state: DRIVER_SUCCEEDED, Execution name: projects/385236764312/locations/asia-south1/metadataStores/default/executions/11616092682586157127; Failed to create external task or refresh its state. Task:Project number: 385236764312, Job id: 7480518563480993792, Task id: 6516721854944641024, Task name: concat, Task state: DRIVER_SUCCEEDED, Execution name: projects/385236764312/locations/asia-south1/metadataStores/default/executions/11616092682586157127; Failed to handle the pipeline task. Task: Project number: 385236764312, Job id: 7480518563480993792, Task id: 6516721854944641024, Task name: concat, Task state: DRIVER_SUCCEEDED, Execution name: projects/385236764312/locations/asia-south1/metadataStores/default/executions/11616092682586157127
Whereas it is just a 2 line component performing simple string concatenation.
Please help and I am not working in any organsiation that i can take Google Support nor can I afford it. Please help.
My Code -
!pip install google-cloud-aiplatform==1.37.0 --upgrade
!pip install google-cloud-pipeline-components==2.6.0 --upgrade
!pip install kfp==2.4.0 --upgrade
import kfp
from typing import NamedTuple
from kfp.dsl import pipeline
from kfp.dsl import component
from kfp import compiler
from google.cloud import aiplatform
PROJECT_ID = “practice-training”
PIPELINE_ROOT = “gs://vertexai-test-bucket-1234”
aiplatform.init(project = PROJECT_ID, location =‘asia-south1’)
Create components
@component(base_image=‘python:3.12’)
def concat(a: str, b:str)->str:
#logging.info(f"Concatenating ‘{a}’ and ‘{b}’ resulted in: ‘{a+b}’")
return a+b
compiler.Compiler().compile(concat, “concat.yaml”)
@component(base_image = ‘python:3.12’)
#def reverse(a: str) → dict:
return {“before”: a, “after”: a[::-1]}
def reverse(a: str)->NamedTuple(“outputs”,[(“before”,str),(“after”,str)]):
return a,a[::-1]
Create Pipeline
@pipeline(
name=“basic-pipeline-2”,
pipeline_root = PIPELINE_ROOT,
description = “My First Pipeline”
)
def basic_pipeline(x:str = “stres”, y:str = “sed”): # 2 pipeline parameters
concat_task = concat(a=x,b=y) # parameters of pipeline are input of first component
reverse_task = reverse(a = concat_task.output) # output of first component is input of second component
compiler.Compiler().compile(
pipeline_func=basic_pipeline, package_path=“basic_pipeline-2.json”)
pipeline specification created as a json
Build pipeline job that is run the pipeline. Run using APi or upload pipeline json file on vertex ai ui
from google.cloud.aiplatform import pipeline_jobs
job = aiplatform.PipelineJob(
display_name = “basic-pipeline-2”,
template_path = “basic_pipeline-2.json”,
parameter_values={“x”: “stres”,“y” :“sed”},
enable_caching = False
)
job.run(sync=False)
PLease Help!!!