Partitioned table loading using DataFlow job -
i want read file , need write bigquery partitioned table, based on date value present in field of file. e.g. if file contains 2 dates 25 , 26 july dataflow should write data 2 partitions based on data present in file.
public class starterpipeline { private static final logger log = loggerfactory.getlogger(starterpipeline.class); public static void main(string[] args) { dataflowpipelineoptions options = pipelineoptionsfactory.as(dataflowpipelineoptions.class); options.setproject(""); options.settemplocation("gs://stage_location/"); pipeline p = pipeline.create(options); list<tablefieldschema> fields = new arraylist<>(); fields.add(new tablefieldschema().setname("id").settype("string")); fields.add(new tablefieldschema().setname("name").settype("string")); fields.add(new tablefieldschema().setname("designation").settype("string")); fields.add(new tablefieldschema().setname("joindate").settype("string")); tableschema schema = new tableschema().setfields(fields); pcollection<string> read = p.apply("read lines",textio.read().from("gs://hadoop_source_files/employee.txt")); pcollection<tablerow> rows = read.apply(pardo.of(new dofn<string,tablerow>(){ @processelement public void processelement(processcontext c) { string[] data = c.element().split(","); c.output(new tablerow().set("id", data[0]).set("name", data[1]).set("designation", data[2]).set("joindate", data[3])); } })); rows.apply(bigqueryio.writetablerows().to(new serializablefunction<valueinsinglewindow<tablerow>, tabledestination>() { public string getdate(string value) { return "project:dataset.dataflow_test$"+value; } @override public tabledestination apply(valueinsinglewindow<tablerow> value) { tablerow row = value.getvalue(); string date = getdate(row.get("joindate").tostring()); string tablespec = date; string tabledescription = ""; return new tabledestination(tablespec, tabledescription); } }).withformatfunction(new serializablefunction<tablerow, tablerow>() { @override public tablerow apply(tablerow input) { // todo auto-generated method stub return input; } }).withschema(schema) .withwritedisposition(bigqueryio.write.writedisposition.write_truncate) .withcreatedisposition(bigqueryio.write.createdisposition.create_if_needed)); p.run(); } }
while running above program getting below error: exception in thread "main" org.apache.beam.sdk.pipeline$pipelineexecutionexception: java.lang.illegalargumentexception: table reference not in [project_id]:[dataset_id].[table_id] format: caused by: java.lang.illegalargumentexception: table reference not in [project_id]:[dataset_id].[table_id] format. let me know if there recommendations
beam not support date partitioned tables. see beam-2390 issue tracking feature.
Comments
Post a Comment