Hello ms4446!
I really liked your solution, but I am having some problems applying it:
When I executed it like the sample below I had the following error message:
with beam.Pipeline(options=pipeline_options) as pipeline:
dt_start_date = (
pipeline
| "ReadStartDate" >> beam.io.ReadFromBigQuery(
query="""
SELECT
MAX(dt_start_date),
FROM
`my_dataset.my_table`
""",
use_standard_sql=True,
temp_dataset=DatasetReference(
projectId=beam_options["project"],
datasetId="ti_materializacao_views_tmp",
)
)
| "ExtractDate" >> beam.Map(lambda row: row['dt_start_date'])
)
get_events = (
pipeline
| "GetEvents" >> beam.io.ReadFromBigQuery(
query= lambda date: f"""
SELECT
*
FROM
`my_dataset.my_table`
WHERE
DATE(column) >= {date}
LIMIT 1
""",
use_standard_sql=True,
temp_dataset=DatasetReference(
projectId=beam_options["project"],
datasetId="ti_materializacao_views_tmp",
),
).with_side_inputs(beam.pvalue.AsSingleton(dt_start_date))
)
2024-11-19 12:45:15.443 WARNING ('No iterator is returned by the process method in %s.', <class 'apache_beam.io.gcp.bigquery_read_internal._PassThroughThenCleanup.expand.<locals>.RemoveExtractedFiles'>)
Traceback (most recent call last):
File "/usr/lib/python3.10/runpy.py", line 196, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/lib/python3.10/runpy.py", line 86, in _run_code
exec(code, run_globals)
File "/home/pipeline_teste.py", line 77, in <module>
).with_side_inputs(beam.pvalue.AsSingleton(dt_start_date))
AttributeError: 'ReadFromBigQuery' object has no attribute 'with_side_inputs'. Did you mean: 'side_inputs'?
Then I fixed it replacing “with_side_inputs()” for “side_inputs()” as suggested by the message, but I keep receiving other error:
get_events = (
pipeline
| "GetEvents" >> beam.io.ReadFromBigQuery(
query= lambda date: f"""
SELECT
*
FROM
`my_dataset.my_table`
WHERE
DATE(column) >= {date}
LIMIT 1
""",
use_standard_sql=True,
temp_dataset=DatasetReference(
projectId=beam_options["project"],
datasetId="ti_materializacao_views_tmp",
),
).side_inputs(beam.pvalue.AsSingleton(dt_start_date))
)
2024-11-19 12:50:18.705 WARNING ('No iterator is returned by the process method in %s.', <class 'apache_beam.io.gcp.bigquery_read_internal._PassThroughThenCleanup.expand.<locals>.RemoveExtractedFiles'>)
Traceback (most recent call last):
File "/usr/lib/python3.10/runpy.py", line 196, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/lib/python3.10/runpy.py", line 86, in _run_code
exec(code, run_globals)
File "/home/pipeline_teste.py", line 77, in <module>
).side_inputs(beam.pvalue.AsSingleton(dt_start_date))
TypeError: 'tuple' object is not callable
I don’t understand what it is happening. Could you help me?
The version of apache-beam that I am using is 2.60.0