I want to query from a deployed matching engine index via Go. I am running this on go 1.22.5, and using cloud.google.com/go/aiplatform version 1.68.0.
Here is my full implementation:
package main
import (
"fmt"
"context"
"regexp"
aiplatformpb "cloud.google.com/go/aiplatform/apiv1/aiplatformpb"
aiplatform "cloud.google.com/go/aiplatform/apiv1"
"google.golang.org/api/option"
"google.golang.org/protobuf/types/known/structpb"
)
func embedTexts(project, location string, texts []string)([][]float32, error) {
...
}
func main() {
ctx := context.Background()
query_string_arr := []string{"Some text to be used as a query for nearest neighbors"}
var project_id = "<project id>"
var location = "us-central1"
// Get text embeddings for query
embeddings, err := embedTexts(project_id, location, query_string_arr)
if err != nil {
fmt.Println(err.Error())
} else {
fmt.Println("The length of the embedding vector is:")
fmt.Println(len(embeddings[0]))
}
// ensure that we are hitting the correct API endpoint for vector search client
apiEndpoint := fmt.Sprintf("%s-aiplatform.googleapis.com:443", location)
// initialize matching engine client
c, err := aiplatform.NewMatchClient(ctx, option.WithEndpoint(apiEndpoint))
if err != nil {
fmt.Println(err.Error())
} else {
fmt.Println("Vector Search Client successfully initialized!")
}
// // initialize structs to create request body
//// 1. initialize the index datapoint restriction struct
restricts_params := &aiplatformpb.IndexDatapoint_Restriction{
Namespace: "<some namespace>",
AllowList: []string{"<some value to allow for namespace>"},
}
//// 2. initialize the index datapoint struct
query_datapoint := &aiplatformpb.IndexDatapoint{
DatapointId: "<Placeholder>",
FeatureVector: embeddings[0],
Restricts: []*aiplatformpb.IndexDatapoint_Restriction{restricts_params},
}
//// 3. initialize the query struct
find_nbrs_query := &aiplatformpb.FindNeighborsRequest_Query{
Datapoint: query_datapoint,
NeighborCount: 50,
}
//// 4. initialize the nearest neighbors request struct
req := &aiplatformpb.FindNeighborsRequest{
IndexEndpoint: "projects/<project number>/locations/us-central1/indexEndpoints/<vector search index endpoint id>",
DeployedIndexId: "<vector search index id>",
Queries: []*aiplatformpb.FindNeighborsRequest_Query{find_nbrs_query},
ReturnFullDatapoint: true,
}
// send request to vector search
resp, err := c.FindNeighbors(ctx, req)
if err != nil {
fmt.Println("Failing at vector search request time.")
fmt.Println(err.Error())
} else {
fmt.Println("Vector Search Request successfully sent!")
}
defer c.Close()
for _, neighbor := range resp.GetNearestNeighbors() {
fmt.Printf("Neighbor ID:%s\n", neighbor.Id)
}
}
Everything here compiles properly. The text embedding prediction client runs properly, and the matching engine client initializes successfully. The issue arises when I submit the API request to query from the vector search index. The above code gives me the following error:
rpc error: code = Unimplemented desc = Operation is not implemented, or supported, or enabled.
This seems to indicate that the FindNeighbors method is not actually implemented in the Go Vertex AI SDK, because I am able to query from my vector search indexes properly via Python, so I know that it’s not an endpoint issue.
My question here is: Am I doing something incorrectly? Or is Go’s SDK not up to speed on querying from a Vector Search index endpoint?
Update:
I have figured out that us-central1-aiplatform.googleapis.com:443 is not the correct endpoint. My vector search index is within a VPC network, therefore, the private service access requires a different endpoint.
Here is the new implementation.
package main
import (
"fmt"
"context"
"regexp"
aiplatformpb "cloud.google.com/go/aiplatform/apiv1/aiplatformpb"
aiplatform "cloud.google.com/go/aiplatform/apiv1"
"google.golang.org/api/option"
grpc "google.golang.org/grpc"
insecure "google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/structpb"
)
// method to invoke text embeddings
func embedTexts(
project, location string, texts []string) ([][]float32, error) {
...
}
// NOTE:
// In order to perform candidate generation, we must do the following:
// 0. Build struct to form request body of text embedding model
// 1. Call text embedding model to convert a string to a dense feature vector
// 2. Build the structs to form the request for the nearest neighbors search
// 3. Call vector search index
//////// 1. Make API request to text embedding model
// in main method
func main() {
ctx := context.Background()
query_string_arr := []string{"<query contents>"}
var project_id = "<project id>"
var location = "us-central1"
//////// 2. Call Text Embeddings
// Get text embeddings for query
embeddings, err := embedTexts(project_id, location, query_string_arr)
if err != nil {
fmt.Println(err.Error())
} else {
fmt.Println("The length of the embedding vector is:")
fmt.Println(len(embeddings[0]))
}
// ensure that we are hitting the correct API endpoint for vector search index
// NOTE: Since we are hitting a PSA index (Private Service Access), we must route our request to the internal
// gRPC endpoint
grpc_conn, err := grpc.NewClient("10.25.0.5:10000", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
fmt.Println(err.Error())
} else {
fmt.Println("Successfully connected to gRPC endpoint.")
}
// initialize matching engine client
c, err := aiplatform.NewMatchClient(ctx, option.WithGRPCConn(grpc_conn))
if err != nil {
fmt.Println(err.Error())
} else {
fmt.Println("Vector Search Client successfully initialized!")
}
defer c.Close()
//////// 3. Create Request Body for Vector Search
// // initialize structs to create request body
//// 1. initialize the index datapoint restriction struct
restricts_params := &aiplatformpb.IndexDatapoint_Restriction{
Namespace: "<namespace>",
AllowList: []string{<allowed namespace values>},
DenyList: []string{<denied namespace values>},
}
//// 2. initialize the index datapoint struct
query_datapoint := &aiplatformpb.IndexDatapoint{
DatapointId: "Placeholder",
FeatureVector: embeddings[0],
Restricts: []*aiplatformpb.IndexDatapoint_Restriction{restricts_params},
}
//// 3. initialize the query struct
find_nbrs_query := &aiplatformpb.FindNeighborsRequest_Query{
Datapoint: query_datapoint,
NeighborCount: 10,
}
//// 4. initialize the nearest neighbors request struct
req := &aiplatformpb.FindNeighborsRequest{
IndexEndpoint: "projects/<project number>/locations/us-central1/indexEndpoints/<index endpoint id>",
DeployedIndexId: "<deployed_index_endpoint_name>_<index id>",
Queries: []*aiplatformpb.FindNeighborsRequest_Query{find_nbrs_query},
ReturnFullDatapoint: false,
}
// send request to vector search
resp, err := c.FindNeighbors(ctx, req)
if err != nil {
fmt.Println("Failing at vector search request time.")
fmt.Println(err.Error())
fmt.Printf("Response object: %s", resp)
} else {
fmt.Println("Vector Search Request successfully sent!")
}
for _, neighbor := range resp.GetNearestNeighbors() {
fmt.Printf("Neighbor ID:%s\n", neighbor.Id)
}
}
I realize that I have to somehow route the request to the gRPC host when initializing the matching engine client.
With that said when I route to the gRPC matching engine IP, I am now getting a (different) error:
rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial tcp 10.25.0.5:10000: connect: connection refused"
I am not sure why the connection is being refused here?