Partitioned table loading using DataFlow job -


i want read file , need write bigquery partitioned table, based on date value present in field of file. e.g. if file contains 2 dates 25 , 26 july dataflow should write data 2 partitions based on data present in file.

public class starterpipeline {   private static final logger log =       loggerfactory.getlogger(starterpipeline.class);    public static void main(string[] args) {     dataflowpipelineoptions options = pipelineoptionsfactory.as(dataflowpipelineoptions.class);     options.setproject("");     options.settemplocation("gs://stage_location/");     pipeline p = pipeline.create(options);      list<tablefieldschema> fields = new arraylist<>();     fields.add(new tablefieldschema().setname("id").settype("string"));     fields.add(new tablefieldschema().setname("name").settype("string"));     fields.add(new tablefieldschema().setname("designation").settype("string"));     fields.add(new tablefieldschema().setname("joindate").settype("string"));     tableschema schema = new tableschema().setfields(fields);      pcollection<string> read = p.apply("read lines",textio.read().from("gs://hadoop_source_files/employee.txt"));      pcollection<tablerow> rows = read.apply(pardo.of(new dofn<string,tablerow>(){       @processelement       public void processelement(processcontext c) {         string[] data = c.element().split(",");          c.output(new tablerow().set("id", data[0]).set("name", data[1]).set("designation", data[2]).set("joindate", data[3]));       }     }));       rows.apply(bigqueryio.writetablerows().to(new serializablefunction<valueinsinglewindow<tablerow>, tabledestination>() {       public string getdate(string value) {         return "project:dataset.dataflow_test$"+value;       }        @override       public tabledestination apply(valueinsinglewindow<tablerow> value) {         tablerow row = value.getvalue();         string date = getdate(row.get("joindate").tostring());         string tablespec = date;         string tabledescription = "";         return new tabledestination(tablespec, tabledescription);       }     }).withformatfunction(new serializablefunction<tablerow, tablerow>() {       @override       public tablerow apply(tablerow input) {         // todo auto-generated method stub         return input;       }     }).withschema(schema)         .withwritedisposition(bigqueryio.write.writedisposition.write_truncate)         .withcreatedisposition(bigqueryio.write.createdisposition.create_if_needed));      p.run();   } } 

while running above program getting below error: exception in thread "main" org.apache.beam.sdk.pipeline$pipelineexecutionexception: java.lang.illegalargumentexception: table reference not in [project_id]:[dataset_id].[table_id] format: caused by: java.lang.illegalargumentexception: table reference not in [project_id]:[dataset_id].[table_id] format. let me know if there recommendations

beam not support date partitioned tables. see beam-2390 issue tracking feature.


Comments

Popular posts from this blog

networking - Vagrant-provisioned VirtualBox VM is not reachable from Ubuntu host -

c# - ASP.NET Core - There is already an object named 'AspNetRoles' in the database -

ruby on rails - ArgumentError: Missing host to link to! Please provide the :host parameter, set default_url_options[:host], or set :only_path to true -