I have a similar setup where I am trying to hit BigQuery Query from within dataflow streaming job from unbounded collection with positional arguments
As such I have to write a bigquery ParDo which hits bigquery Query
static class BigQueryParDo extends DoFn<Long, TableRow> {
private final PCollectionView mySideInput;
// Way to Access side input in a non - anonymous DoFn
// https://stackoverflow.com/questions/45463061/access-side-input-in-a-non-anonymous-dofn
BigQueryParDo(PCollectionView mySideInput) {
this.mySideInput = mySideInput;
}
@ProcessElement
public void processElement(DoFn<Long, TableRow>.ProcessContext c) throws Exception {
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
LocalDateTime queryingMoment = LocalDateTime.now();
int minuteOfHour = queryingMoment.getMinuteOfHour();
// Now below can bew used to form BQ query while streaming in parametrized way (giving positional parameters)
c.sideInput(mySideInput);
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(
"select JSON_EXTRACT_SCALAR(data, '$._id') as prid, " +
" TIMESTAMP_SECONDS(SAFE_CAST(SAFE_CAST(JSON_EXTRACT_SCALAR(data, '$.eka_ingested_at') as INT64) / 1000 AS INT64)) as eka_ingested_at," +
" SAFE_CAST(JSON_EXTRACT_SCALAR(data, '$.created_at') AS TIMESTAMP FORMAT 'DD:MM:YYYY,HH24:MI:SS' AT TIME ZONE 'UTC') as created_at," +
" JSON_EXTRACT_SCALAR(data, '$.prescription_url') as prescription_url," +
" JSON_EXTRACT_SCALAR(data, '$.docid') as docid," +
" JSON_EXTRACT_SCALAR(data, '$.flavour') as flavour," +
" JSON_EXTRACT_SCALAR(data, '$.dob') as dob," +
" JSON_EXTRACT_SCALAR(data, '$.patient_id') as patient_id," +
" JSON_EXTRACT_SCALAR(data, '$.dob_timestamp') as dob_timestamp," +
" JSON_EXTRACT_SCALAR(data, '$.patient_gender') as patient_gender," +
" JSON_EXTRACT_SCALAR(data, '$.patient_name') as patient_name," +
" JSON_EXTRACT_SCALAR(data, '$.patient_phone') as patient_phone," +
" JSON_EXTRACT_SCALAR(data, '$.op') as op " +
" from `mydataset.pubsub.prescription_latest`")
.setUseLegacySql(false) // Use standard SQL syntax for queries.
.build();
// Create a job ID so that we can safely retry.
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
// Wait for the query to complete.
queryJob = queryJob.waitFor();
// Check for errors
if (queryJob == null) {
throw new RuntimeException("Job no longer exists");
} else if (queryJob.getStatus().getError() != null) {
throw new RuntimeException(queryJob.getStatus().getError().toString());
}
// Get the results.
QueryResponse response = bigquery.getQueryResults(jobId);
TableResult result = queryJob.getQueryResults();
List<TableRow> results = new ArrayList<TableRow>();
for (FieldValueList row : result.iterateAll()) {
TableRow tableRow = new TableRow();
for (int i=0; i<row.size(); i++){
FieldValue value = row.get(i);
tableRow.set(value.toString(), value.getValue());
}
results.add(tableRow);
}
assert results != null;
for (TableRow tableRows : results) {
c.output(tableRows);
}
}
}
…
Called as below
//....mainData comes from above windowed to make bounded from unbounded collection
//....
PCollection<TableRow> bqQueryData = mainData.apply(ParDo.of(new BigQueryParDo(newRandomNumber)).withSideInput("Side Input to BQ Query", newRandomNumber));
// Do further processing on bqQueryData
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("prid").setType("STRING"));
fields.add(new TableFieldSchema().setName("created_at").setType("TIMESTAMP"));
fields.add(new TableFieldSchema().setName("eka_ingested_at").setType("TIMESTAMP"));
fields.add(new TableFieldSchema().setName("prescription_url").setType("STRING"));
fields.add(new TableFieldSchema().setName("doc_id").setType("STRING"));
fields.add(new TableFieldSchema().setName("flavour").setType("STRING"));
fields.add(new TableFieldSchema().setName("dob").setType("STRING"));
fields.add(new TableFieldSchema().setName("patient_id").setType("STRING"));
fields.add(new TableFieldSchema().setName("dob_timestamp").setType("STRING"));
fields.add(new TableFieldSchema().setName("patient_gender").setType("STRING"));
fields.add(new TableFieldSchema().setName("patient_name").setType("STRING"));
fields.add(new TableFieldSchema().setName("patient_phone").setType("STRING"));
fields.add(new TableFieldSchema().setName("op").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
bqQueryData.apply("Write To BQ", BigQueryIO
.writeTableRows()
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withSchema(schema)
.to(options.getTable())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED )
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
p.run();
I am getting below error: (Dataset[mydataset] is anonymised )
Another important observation : If I replace mydataset with SQL query to any public dataset, its able to get data from BigQuery
I am trying to provide service account as well while running dataflow job
Error message from worker: com.google.cloud.bigquery.BigQueryException: Not found: Job mydataset:0694cb64-3977-406f-a207-6ee9f215a972
com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:114)
com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.getQueryResults(HttpBigQueryRpc.java:702)
com.google.cloud.bigquery.BigQueryImpl$36.call(BigQueryImpl.java:1441)
com.google.cloud.bigquery.BigQueryImpl$36.call(BigQueryImpl.java:1436)
com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:103)
com.google.cloud.bigquery.BigQueryRetryHelper.run(BigQueryRetryHelper.java:86)
com.google.cloud.bigquery.BigQueryRetryHelper.runWithRetries(BigQueryRetryHelper.java:49)
com.google.cloud.bigquery.BigQueryImpl.getQueryResults(BigQueryImpl.java:1435)
com.google.cloud.bigquery.BigQueryImpl.getQueryResults(BigQueryImpl.java:1419)
orbi.prescriptionwindow.window.RXMonetize$BigQueryParDo.processElement(RXMonetize.java:85)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found GET https://www.googleapis.com/bigquery/v2/projects/mydataset/queries/0694cb64-3977-406f-a207-6ee9f215a972?prettyPrint=false
{ "code": 404,
"errors":
[
{
"domain": "global",
"message": "Not found: Job mydataset:0694cb64-3977-406f-a207-6ee9f215a972",
"reason": "notFound"
}
],
"message": "Not found: Job mydataset:0694cb64-3977-406f-a207-6ee9f215a972",
"status": "NOT_FOUND"
}