When I’m trying to execute the Firestore query from dataflow job (apache beam) it’s giving com.google.api.gax.rpc.UnknownException: io.grpc.StatusRuntimeException: UNKNOWN: Channel Pipeline:
It’s working fine with local emulator and standalone java application
Please help me.
addData and getData methods calling from dataflow job
public class PipelineReferenceClientFirestore implements ReferenceClient {
private static final Logger LOGGER = LoggerFactory.getLogger(PipelineReferenceClientFirestore.class);
private String projectId;
private String domain;
private String subCollection;
private Firestore firestore;
private CollectionReference collectionReference;
/**
- @PARAM id adding the in Firestore to specific documentId
- @PARAM data it’s having Json data
 */
 @Override
 public void addData(String id, Map<String, Object> data) {
 ApiFuture future = null;
 try {
 PipelineReferenceDocument pipelineReferenceDocument = new PipelineReferenceDocument();
 pipelineReferenceDocument.setDocumentId(id);
 pipelineReferenceDocument.setDomain(domain);
 pipelineReferenceDocument.setCollectionName(subCollection);
 pipelineReferenceDocument.setDocument(data);
 future = collectionReference.document(id).set(pipelineReferenceDocument);
 Timestamp updatedTime = future.get().getUpdateTime();
 if(LOGGER.isInfoEnabled()) {
 LOGGER.info(“updated timestamp : {}”,updatedTime);
 LOGGER.info(“status : {}”,future.isDone());
 }
 } catch (InterruptedException | ExecutionException e) {
 Thread.currentThread().interrupt();
 throw new ReferenceClientException(“Query execution error”, e);
 }
}
/**
- @PARAM id fetching data from Firestore to specific documentId
- @return Map<String, Object> having Payload
 */
 @Override
 public Map<String, Object> getData(String id) {
 PipelineReferenceDocument pipelineReferenceDocument = new PipelineReferenceDocument();
 try {
 ApiFuture future = collectionReference.document(id).get();
 DocumentSnapshot document = future.get();
 if (document.exists()) {
 pipelineReferenceDocument = document.toObject(PipelineReferenceDocument.class);
 return pipelineReferenceDocument.getDocument();
 } else {
 throw new ReferenceClientException("documentId not found : "+ id);
 }
 } catch (InterruptedException e) {
 Thread.currentThread().interrupt();
 throw new ReferenceClientException(“Firestore Connectivity Got interrupted”, e);
 } catch (ExecutionException e) {
 throw new ReferenceClientException(“Query execution error”, e);
 }
 }
public static class Builder {
private String projectId;
private String domain;
private String subCollection;
public PipelineReferenceClientFirestore.Builder setDomain(String domain) {
this.domain = domain;
return this;
}
public PipelineReferenceClientFirestore.Builder setProjectId(String projectId) {
this.projectId = projectId;
return this;
}
public PipelineReferenceClientFirestore.Builder setLoadCollection(String subCollection) {
this.subCollection = subCollection;
return this;
}
public synchronized PipelineReferenceClientFirestore build() {
PipelineReferenceClientFirestore result = new PipelineReferenceClientFirestore();
result.projectId = this.projectId;
result.domain = this.domain;
result.subCollection = this.subCollection;
initializeFireStoreClient(result);
initializeCollections(result);
return result;
}
/**
- Method used to open the client with the already set connection variables.
 */
 private void initializeFireStoreClient(PipelineReferenceClientFirestore client) {
 FirestoreOptions firestoreOptions;
 try {
 if (client.firestore == null) {
 FirestoreOptions.Builder optionsBuilder = FirestoreOptions.newBuilder().setProjectId(client.projectId);
 if (System.getenv().containsKey(“FIRESTORE_EMULATOR_HOST”)) {
 optionsBuilder.setHost(System.getenv().get(“FIRESTORE_EMULATOR_HOST”));
 optionsBuilder.setCredentials(NoCredentials.getInstance());
 client.firestore = optionsBuilder.build().getService();
 } else {
 firestoreOptions = FirestoreOptions.getDefaultInstance().toBuilder()
 .setProjectId(client.projectId)
 .setCredentials(GoogleCredentials.getApplicationDefault())
 .build();
 client.firestore = firestoreOptions.getService();
 }
 }
 } catch (IOException e) {
 throw new ReferenceClientException(“Firestore Connectivity not established”, e);
 }
 }
/**
- @PARAM client is object for PipelineReferenceClientFirestore class
 */
 private void initializeCollections(PipelineReferenceClientFirestore client) {
 client.collectionReference = client.firestore.collection(PipelineReferenceClientConstants.PIPELINE_COLLECTION).document(PipelineReferenceClientConstants.PIPELINE_DOCUMENT).collection(PipelineReferenceClientConstants.DOMAIN_COLLECTION).document(client.domain).collection(client.subCollection);
 }
}