java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton

I’m facing the below issue whilst running the GCP Dataflow cron job. I’ve a cron job running every 20 minutes which fetches the data from the BigQuery table. When I’ve a single entry in the table, it fives no error. But when I’ve multiple entries in the table then it’ll throw the below error. It doesn’t throw any error when I run the tests in local. Any suggestions would be helpful and much appreciated.

java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view. Consider using Combine.globally().asSingleton() to combine the PCollection into a single value

at org.apache.beam.sdk.transforms.View$SingletonCombineFn.apply ( org/apache.beam.sdk.transforms/View.java:464 ) at org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput ( org/apache.beam.sdk.transforms/Combine.java:528 ) at org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput ( org/apache.beam.sdk.transforms/Combine.java:497 ) at org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add ( org/apache.beam.runners.dataflow.worker/WindmillStateInternals.java:2067 ) at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.processValue ( org/apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core/SystemReduceFn.java:119 )…

PCollection<Long> cronPCollection = getCronJobPCollection(pipeline, options);
    PCollectionView<List<Pixel>> pixels = getPixelsSideInput(cronPCollection, options);

    pipeline.apply("Read Events",
            PubsubIO.readMessages().fromSubscription(options.getPurchasesSubscription()))
            .apply("Unpack Purchase Events", ParDo.of(new UnpackPurchaseMessageFn()))
            .apply("Publish Purchase Events",
                    ParDo.of(new EventAndSendFn(pixels, sites)).withSideInputs(pixels, sites));

public static PCollection<Long> getCronJobPCollection(Pipeline pipeline,
                                                      PipelineOptions options) {
    return pipeline
            .apply("Cron Trigger",
                    GenerateSequence.from(0).withRate(1,
                            Duration.standardMinutes(options.getMetadataRefreshMinutes())))
            .apply("Cron Window", Window.<Long>into(new GlobalWindows())
                    .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
                    .discardingFiredPanes());
}

public static PCollectionView<List<Pixel>> getPixelsSideInput(PCollection<Long> cronPCollection,
                                                              PipelineOptions options) {
    return cronPCollection
            .apply("Read Pixel Configs", ParDo.of(new ReadPixelsFromBQFn(options.getProject())))
            .apply("Transform Pixel Configs", View.asSingleton());
}

 static class ReadPixelsFromBQFn extends DoFn<Long, List<Pixel>> {
    private final String project;

    public ReadPixelsFromBQFn(String project) {
        this.project = project;
    }

    @ProcessElement
    public void processElement(@Element Long in, OutputReceiver<List<Pixel>> out) {
   
        TableResult result = BigQueryUtil.readPixelsFromBigQuery(this.project);
    // This method BigQueryUtil.convertBQRowsToPixels(result) returns List<Pixel>
        out.output(BigQueryUtil.convertBQRowsToPixels(result));
    }
}

Hello,

Maybe you can try to use Combine.globally().asSingleton() to combine the PCollection into a single value as the message mentioned?

Java Combine.globally Examples might be a good example.

Thanks for your reply. I’m a bit confused here where exactly to use the Combine.globally().asSingleton as globally method expects argument / arguments. Could you help me to modify my below code and direct me where exactly the change would apply please?

 

public class App {
    private static final Logger LOG = LoggerFactory.getLogger(App.class);

    public static void main(String[] args) {
        EventsHandlerDataflowPipelineOptions options = PipelineOptionsFactory
                .fromArgs(args)
                .withValidation()
                .as(EventsHandlerDataflowPipelineOptions.class);

        Pipeline pipeline = Pipeline.create(options);
        PCollection<Long> cronPCollection = getCronJobPCollection(pipeline, options);
        PCollectionView<List<Pixel>> pixels = getPixelsSideInput(cronPCollection, options);
        PCollectionView<List<Site>> sites = getSitesSideInput(cronPCollection, options);

        pipeline.apply("Read Purchase Events",
                PubsubIO.readMessages().fromSubscription(options.getPurchasesSubscription()))
                .apply("Unpack Purchase Events", ParDo.of(new UnpackPurchaseMessageFn()))
                .apply("Publish Purchase Events",
                        ParDo.of(new ToEventAndSendFn(pixels, sites)).withSideInputs(pixels, sites));

        pipeline.run();
    }

    public static PCollection<Long> getCronJobPCollection(Pipeline pipeline,
                                                          EventsHandlerDataflowPipelineOptions options) {
        return pipeline
                .apply("Cron Trigger",
                        GenerateSequence.from(0).withRate(1,
                                Duration.standardMinutes(options.getMetadataRefreshMinutes())))
                .apply("Cron Window", Window.<Long>into(new GlobalWindows())
                        .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
                        .discardingFiredPanes());
    }

    public static PCollectionView<List<Pixel>> getPixelsSideInput(PCollection<Long> cronPCollection,
                                                                  EventsHandlerDataflowPipelineOptions options) {
        return cronPCollection
                .apply("Read Pixel Configs", ParDo.of(new ReadPixelsFromBQFn(options.getProject())))
                .apply("Transform Pixel Configs", View.asSingleton());
    }

    public static PCollectionView<List<Site>> getSitesSideInput(PCollection<Long> cronPCollection,
                                                                EventsHandlerDataflowPipelineOptions options) {
        return cronPCollection
                .apply("Read Site Details", ParDo.of(new ReadSitesFromBQFn(options.getProject())))
                .apply("Transform Site Details", View.asSingleton());
    }

    static class ReadPixelsFromBQFn extends DoFn<Long, List<Pixel>> {
        private final String project;

        public ReadPixelsFromBQFn(String project) {
            this.project = project;
        }

        @ProcessElement
        public void processElement(@Element Long in, OutputReceiver<List<Pixel>> out) {
            TableResult result = BigQueryUtil.readPixelsFromBigQuery(this.project);
            out.output(BigQueryUtil.convertBQRowsToPixels(result));
        }
    }

    static class ReadSitesFromBQFn extends DoFn<Long, List<Site>> {
        private final String project;

        public ReadSitesFromBQFn(String project) {
            this.project = project;
        }

        @ProcessElement
        public void processElement(@Element Long in, OutputReceiver<List<Site>> out) {
            TableResult result = BigQueryUtil.readSitesFromBigQuery(this.project);
            out.output(BigQueryUtil.convertBQRowsToSites(result));
        }
    }

    static class UnpackSessionMessageFn extends DoFn<PubsubMessage, IntermediateEvent> {
        @ProcessElement
        public void processElement(@Element PubsubMessage in, OutputReceiver<IntermediateEvent> out) {
            out.output(new IntermediateEvent(in));
        }
    }

    static class UnpackPurchaseMessageFn extends DoFn<PubsubMessage, IntermediateEvent> {
        @ProcessElement
        public void processElement(@Element PubsubMessage in, OutputReceiver<IntermediateEvent> out) {
            //TODO log should be removed after testing
            out.output(new IntermediateEvent(in, true));
        }
    }

    static class ToEventAndSendFn extends DoFn<IntermediateEvent, Void> {
        private final PCollectionView<List<Pixel>> pixels;
        private final PCollectionView<List<Site>> sites;

        //add constructor with side input so it can be accessed in static class
        public ToEventAndSendFn(PCollectionView<List<Pixel>> pixels, PCollectionView<List<Site>> sites) {
            this.pixels = pixels;
            this.sites = sites;
        }

        @ProcessElement
        public void processElement(@Element IntermediateEvent in, OutputReceiver<Void> out, ProcessContext c) {
            out.output(Transformer.createEventAndSendToApi(in,
                    c.sideInput(this.pixels), c.sideInput(this.sites)));
        }
    }
}

Thanks for your reply. I’m a bit confused here where exactly to use the below Combine.globally().asSingleton as globally method expects argument / arguments. Could you help me to modify my below code and direct me where exactly the change would apply please?

 

public class App {
    private static final Logger LOG = LoggerFactory.getLogger(App.class);

    public static void main(String[] args) {
        EventsHandlerDataflowPipelineOptions options = PipelineOptionsFactory
                .fromArgs(args)
                .withValidation()
                .as(EventsHandlerDataflowPipelineOptions.class);

        Pipeline pipeline = Pipeline.create(options);
        PCollection<Long> cronPCollection = getCronJobPCollection(pipeline, options);
        PCollectionView<List<Pixel>> pixels = getPixelsSideInput(cronPCollection, options);
        PCollectionView<List<Site>> sites = getSitesSideInput(cronPCollection, options);

        pipeline.apply("Read Purchase Events",
                PubsubIO.readMessages().fromSubscription(options.getPurchasesSubscription()))
                .apply("Unpack Purchase Events", ParDo.of(new UnpackPurchaseMessageFn()))
                .apply("Publish Purchase Events",
                        ParDo.of(new ToEventAndSendFn(pixels, sites)).withSideInputs(pixels, sites));

        pipeline.run();
    }

    public static PCollection<Long> getCronJobPCollection(Pipeline pipeline,
                                                          EventsHandlerDataflowPipelineOptions options) {
        return pipeline
                .apply("Cron Trigger",
                        GenerateSequence.from(0).withRate(1,
                                Duration.standardMinutes(options.getMetadataRefreshMinutes())))
                .apply("Cron Window", Window.<Long>into(new GlobalWindows())
                        .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
                        .discardingFiredPanes());
    }

    public static PCollectionView<List<Pixel>> getPixelsSideInput(PCollection<Long> cronPCollection,
                                                                  EventsHandlerDataflowPipelineOptions options) {
        return cronPCollection
                .apply("Read Pixel Configs", ParDo.of(new ReadPixelsFromBQFn(options.getProject())))
                .apply("Transform Pixel Configs", View.asSingleton());
    }

    public static PCollectionView<List<Site>> getSitesSideInput(PCollection<Long> cronPCollection,
                                                                EventsHandlerDataflowPipelineOptions options) {
        return cronPCollection
                .apply("Read Site Details", ParDo.of(new ReadSitesFromBQFn(options.getProject())))
                .apply("Transform Site Details", View.asSingleton());
    }

    static class ReadPixelsFromBQFn extends DoFn<Long, List<Pixel>> {
        private final String project;

        public ReadPixelsFromBQFn(String project) {
            this.project = project;
        }

        @ProcessElement
        public void processElement(@Element Long in, OutputReceiver<List<Pixel>> out) {
            TableResult result = BigQueryUtil.readPixelsFromBigQuery(this.project);
            out.output(BigQueryUtil.convertBQRowsToPixels(result));
        }
    }

    static class ReadSitesFromBQFn extends DoFn<Long, List<Site>> {
        private final String project;

        public ReadSitesFromBQFn(String project) {
            this.project = project;
        }

        @ProcessElement
        public void processElement(@Element Long in, OutputReceiver<List<Site>> out) {
            TableResult result = BigQueryUtil.readSitesFromBigQuery(this.project);
            out.output(BigQueryUtil.convertBQRowsToSites(result));
        }
    }

    static class UnpackSessionMessageFn extends DoFn<PubsubMessage, IntermediateEvent> {
        @ProcessElement
        public void processElement(@Element PubsubMessage in, OutputReceiver<IntermediateEvent> out) {
            out.output(new IntermediateEvent(in));
        }
    }

    static class UnpackPurchaseMessageFn extends DoFn<PubsubMessage, IntermediateEvent> {
        @ProcessElement
        public void processElement(@Element PubsubMessage in, OutputReceiver<IntermediateEvent> out) {
            //TODO log should be removed after testing
            out.output(new IntermediateEvent(in, true));
        }
    }

    static class ToEventAndSendFn extends DoFn<IntermediateEvent, Void> {
        private final PCollectionView<List<Pixel>> pixels;
        private final PCollectionView<List<Site>> sites;

        //add constructor with side input so it can be accessed in static class
        public ToEventAndSendFn(PCollectionView<List<Pixel>> pixels, PCollectionView<List<Site>> sites) {
            this.pixels = pixels;
            this.sites = sites;
        }

        @ProcessElement
        public void processElement(@Element IntermediateEvent in, OutputReceiver<Void> out, ProcessContext c) {
            out.output(Transformer.createEventAndSendToApi(in,
                    c.sideInput(this.pixels), c.sideInput(this.sites)));
        }
    }
}