com.google.api.gax.rpc.UnknownException: io.grpc.StatusRuntimeException: UNKNOWN: Channel Pipeline:

When I’m trying to execute the Firestore query from dataflow job (apache beam) it’s giving com.google.api.gax.rpc.UnknownException: io.grpc.StatusRuntimeException: UNKNOWN: Channel Pipeline:

It’s working fine with local emulator and standalone java application

Please help me.

addData and getData methods calling from dataflow job

public class PipelineReferenceClientFirestore implements ReferenceClient {

private static final Logger LOGGER = LoggerFactory.getLogger(PipelineReferenceClientFirestore.class);

private String projectId;

private String domain;

private String subCollection;

private Firestore firestore;

private CollectionReference collectionReference;

/**

  • @PARAM id adding the in Firestore to specific documentId
  • @PARAM data it’s having Json data
    */
    @Override
    public void addData(String id, Map<String, Object> data) {
    ApiFuture future = null;
    try {
    PipelineReferenceDocument pipelineReferenceDocument = new PipelineReferenceDocument();
    pipelineReferenceDocument.setDocumentId(id);
    pipelineReferenceDocument.setDomain(domain);
    pipelineReferenceDocument.setCollectionName(subCollection);
    pipelineReferenceDocument.setDocument(data);
    future = collectionReference.document(id).set(pipelineReferenceDocument);
    Timestamp updatedTime = future.get().getUpdateTime();
    if(LOGGER.isInfoEnabled()) {
    LOGGER.info(“updated timestamp : {}”,updatedTime);
    LOGGER.info(“status : {}”,future.isDone());
    }
    } catch (InterruptedException | ExecutionException e) {
    Thread.currentThread().interrupt();
    throw new ReferenceClientException(“Query execution error”, e);
    }

}

/**

  • @PARAM id fetching data from Firestore to specific documentId
  • @return Map<String, Object> having Payload
    */
    @Override
    public Map<String, Object> getData(String id) {
    PipelineReferenceDocument pipelineReferenceDocument = new PipelineReferenceDocument();
    try {
    ApiFuture future = collectionReference.document(id).get();
    DocumentSnapshot document = future.get();
    if (document.exists()) {
    pipelineReferenceDocument = document.toObject(PipelineReferenceDocument.class);
    return pipelineReferenceDocument.getDocument();
    } else {
    throw new ReferenceClientException("documentId not found : "+ id);
    }
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    throw new ReferenceClientException(“Firestore Connectivity Got interrupted”, e);
    } catch (ExecutionException e) {
    throw new ReferenceClientException(“Query execution error”, e);
    }
    }

public static class Builder {
private String projectId;
private String domain;
private String subCollection;

public PipelineReferenceClientFirestore.Builder setDomain(String domain) {
this.domain = domain;
return this;
}

public PipelineReferenceClientFirestore.Builder setProjectId(String projectId) {
this.projectId = projectId;
return this;
}

public PipelineReferenceClientFirestore.Builder setLoadCollection(String subCollection) {
this.subCollection = subCollection;
return this;
}

public synchronized PipelineReferenceClientFirestore build() {
PipelineReferenceClientFirestore result = new PipelineReferenceClientFirestore();
result.projectId = this.projectId;
result.domain = this.domain;
result.subCollection = this.subCollection;
initializeFireStoreClient(result);
initializeCollections(result);
return result;
}

/**

  • Method used to open the client with the already set connection variables.
    */
    private void initializeFireStoreClient(PipelineReferenceClientFirestore client) {
    FirestoreOptions firestoreOptions;
    try {
    if (client.firestore == null) {
    FirestoreOptions.Builder optionsBuilder = FirestoreOptions.newBuilder().setProjectId(client.projectId);
    if (System.getenv().containsKey(“FIRESTORE_EMULATOR_HOST”)) {
    optionsBuilder.setHost(System.getenv().get(“FIRESTORE_EMULATOR_HOST”));
    optionsBuilder.setCredentials(NoCredentials.getInstance());
    client.firestore = optionsBuilder.build().getService();
    } else {
    firestoreOptions = FirestoreOptions.getDefaultInstance().toBuilder()
    .setProjectId(client.projectId)
    .setCredentials(GoogleCredentials.getApplicationDefault())
    .build();
    client.firestore = firestoreOptions.getService();
    }
    }
    } catch (IOException e) {
    throw new ReferenceClientException(“Firestore Connectivity not established”, e);
    }
    }

/**

  • @PARAM client is object for PipelineReferenceClientFirestore class
    */
    private void initializeCollections(PipelineReferenceClientFirestore client) {
    client.collectionReference = client.firestore.collection(PipelineReferenceClientConstants.PIPELINE_COLLECTION).document(PipelineReferenceClientConstants.PIPELINE_DOCUMENT).collection(PipelineReferenceClientConstants.DOMAIN_COLLECTION).document(client.domain).collection(client.subCollection);
    }

}

Hello,

Reading through your Code, it does NOT seem to me that you are using the Firestore in Native Mode Connector for Apache Beam[0]. Is this not a consideration for your use-case? Please let me know if my understanding is wrong.

As highlighted in this article[1], the Firestore in Native Mode connector for Apache Beam makes data processing easier for Firestore users. I think this may be a more appropriate and efficient way, if you are using the Firestore Database in Native Mode. There is even an implementation sample code provided in this link[2].

[0]https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/io/gcp/firestore/FirestoreIO.html
[1]https://cloud.google.com/blog/products/databases/apache-beam-firestore-connector-released
[2]https://cloud.google.com/blog/topics/developers-practitioners/using-firestore-and-apache-beam-data-processing