Google BigQuery 模式
此頁面的範例顯示使用 BigQueryIO 的常見模式。
- Java SDK
- Python SDK
BigQueryIO 死信模式
在生產系統中,使用 BigQueryIO 實作死信模式會很有用,它可以將 BigQueryIO 處理期間發生錯誤的任何元素輸出到另一個 PCollection,以進行進一步處理。以下範例會印出錯誤,但在生產系統中,可以將它們傳送到死信表,以便稍後修正。
當使用 STREAMING_INSERTS
時,您可以使用 WriteResult
物件來存取一個 PCollection
,其中包含無法插入到 BigQuery 的 TableRows
。如果還設定了 withExtendedErrorInfo
屬性,您將能夠從 WriteResult
存取 PCollection<BigQueryInsertError>
。PCollection
將包含對資料表、資料列和 InsertErrors
的參考。哪些錯誤會新增至死信佇列由 InsertRetryPolicy
決定。
在結果 tuple 中,您可以存取 FailedRows
以存取失敗的插入。
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryOptions.class);
Pipeline p = Pipeline.create(options);
// Create a bug by writing the 2nd value as null. The API will correctly
// throw an error when trying to insert a null value into a REQUIRED field.
WriteResult result =
p.apply(Create.of(1, 2))
.apply(
BigQueryIO.<Integer>write()
.withSchema(
new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema()
.setName("num")
.setType("INTEGER")
.setMode("REQUIRED"))))
.to("Test.dummyTable")
.withFormatFunction(x -> new TableRow().set("num", (x == 2) ? null : x))
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
// Forcing the bounded pipeline to use streaming inserts
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
// set the withExtendedErrorInfo property.
.withExtendedErrorInfo()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
result
.getFailedInsertsWithErr()
.apply(
MapElements.into(TypeDescriptors.strings())
.via(
x -> {
System.out.println(" The table was " + x.getTable());
System.out.println(" The row was " + x.getRow());
System.out.println(" The error was " + x.getError());
return "";
}));
p.run();
/* Sample Output From the pipeline:
<p>The table was GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=Test,projectId=<>, tableId=dummyTable}}
<p>The row was GenericData{classInfo=[f], {num=null}}
<p>The error was GenericData{classInfo=[errors, index],{errors=[GenericData{classInfo=[debugInfo, location, message, reason], {debugInfo=,location=, message=Missing required field: Msg_0_CLOUD_QUERY_TABLE.num., reason=invalid}}],index=0}}
*/
}
# Create pipeline.
schema = ({'fields': [{'name': 'a', 'type': 'STRING', 'mode': 'REQUIRED'}]})
pipeline = beam.Pipeline()
errors = (
pipeline | 'Data' >> beam.Create([1, 2])
| 'CreateBrokenData' >>
beam.Map(lambda src: {'a': src} if src == 2 else {'a': None})
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
"<Your Project:Test.dummy_a_table",
schema=schema,
insert_retry_strategy='RETRY_ON_TRANSIENT_ERROR',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND'))
result = (
errors['FailedRows']
| 'PrintErrors' >>
beam.FlatMap(lambda err: print("Error Found {}".format(err))))
上次更新於 2024/10/31
您是否找到所有您要尋找的內容?
內容是否實用且清楚?是否有任何您想要變更的地方?請告訴我們!