Hadoop 輸入/輸出格式 IO
重要!先前稱為
HadoopInputFormatIO
的 Hadoop 輸入格式 IO 實作已自 Apache Beam 2.10 起棄用。請使用目前支援InputFormat
和OutputFormat
的HadoopFormatIO
。
HadoopFormatIO
是一個轉換,用於從任何實作 Hadoop InputFormat
的來源讀取資料,或將資料寫入任何實作 Hadoop OutputFormat
的接收器。例如,Cassandra、Elasticsearch、HBase、Redis、Postgres 等。
HadoopFormatIO
可讓您連線到許多尚未具有 Beam IO 轉換的資料來源/接收器。不過,HadoopFormatIO
在連線到 InputFormat
或 OutputFormat
時必須進行一些效能上的取捨。因此,如果有另一個 Beam IO 轉換專門用於連線到您選擇的資料來源/接收器,建議您使用該轉換。
使用 HadoopFormatIO 讀取
您需要傳遞一個 Hadoop Configuration
,其中包含指定讀取方式的參數。Configuration
的許多屬性是選擇性的,有些屬性對於某些 InputFormat
類別是必要的,但所有 InputFormat
類別都必須設定以下屬性
mapreduce.job.inputformat.class
- 用於連線到您選擇的資料來源的InputFormat
類別。key.class
-mapreduce.job.inputformat.class
中InputFormat
傳回的Key
類別。value.class
-mapreduce.job.inputformat.class
中InputFormat
傳回的Value
類別。
例如
Configuration myHadoopConfiguration = new Configuration(false);
// Set Hadoop InputFormat, key and value class in configuration
myHadoopConfiguration.setClass("mapreduce.job.inputformat.class", InputFormatClass,
InputFormat.class);
myHadoopConfiguration.setClass("key.class", InputFormatKeyClass, Object.class);
myHadoopConfiguration.setClass("value.class", InputFormatValueClass, Object.class);
您需要檢查 InputFormat
輸出的 Key
和 Value
類別是否具有可用的 Beam Coder
。如果沒有,您可以使用 withKeyTranslation
或 withValueTranslation
來指定將這些類別的執行個體轉換為 Beam Coder
支援的另一個類別的方法。這些設定是選擇性的,您不需要為鍵和值都指定轉換。
例如
SimpleFunction<InputFormatKeyClass, MyKeyClass> myOutputKeyType =
new SimpleFunction<InputFormatKeyClass, MyKeyClass>() {
public MyKeyClass apply(InputFormatKeyClass input) {
// ...logic to transform InputFormatKeyClass to MyKeyClass
}
};
SimpleFunction<InputFormatValueClass, MyValueClass> myOutputValueType =
new SimpleFunction<InputFormatValueClass, MyValueClass>() {
public MyValueClass apply(InputFormatValueClass input) {
// ...logic to transform InputFormatValueClass to MyValueClass
}
};
僅使用 Hadoop 組態讀取資料。
使用組態和鍵轉換讀取資料
例如,Beam Coder
不適用於 Key
類別,因此需要鍵轉換。
使用組態和值轉換讀取資料
例如,Beam Coder
不適用於 Value
類別,因此需要值轉換。
使用組態、值轉換和鍵轉換讀取資料
例如,Beam 編碼器不適用於 InputFormat
的 Key
類別和 Value
類別,因此需要鍵和值轉換。
特定 InputFormat 的範例
Cassandra - CqlInputFormat
若要從 Cassandra 讀取資料,請使用 org.apache.cassandra.hadoop.cql3.CqlInputFormat
,需要設定以下屬性
Configuration cassandraConf = new Configuration();
cassandraConf.set("cassandra.input.thrift.port", "9160");
cassandraConf.set("cassandra.input.thrift.address", CassandraHostIp);
cassandraConf.set("cassandra.input.partitioner.class", "Murmur3Partitioner");
cassandraConf.set("cassandra.input.keyspace", "myKeySpace");
cassandraConf.set("cassandra.input.columnfamily", "myColumnFamily");
cassandraConf.setClass("key.class", java.lang.Long Long.class, Object.class);
cassandraConf.setClass("value.class", com.datastax.driver.core.Row Row.class, Object.class);
cassandraConf.setClass("mapreduce.job.inputformat.class", org.apache.cassandra.hadoop.cql3.CqlInputFormat CqlInputFormat.class, InputFormat.class);
如下所示呼叫 Read 轉換
CqlInputFormat
鍵類別是 java.lang.Long
Long
,它具有 Beam Coder
。CqlInputFormat
值類別是 com.datastax.driver.core.Row
Row
,它沒有 Beam Coder
。您可以提供自己的轉換方法,而不是撰寫新的編碼器,如下所示
Elasticsearch - EsInputFormat
若要從 Elasticsearch 讀取資料,請使用 EsInputFormat
,需要設定以下屬性
Configuration elasticsearchConf = new Configuration();
elasticsearchConf.set("es.nodes", ElasticsearchHostIp);
elasticsearchConf.set("es.port", "9200");
elasticsearchConf.set("es.resource", "ElasticIndexName/ElasticTypeName");
elasticsearchConf.setClass("key.class", org.apache.hadoop.io.Text Text.class, Object.class);
elasticsearchConf.setClass("value.class", org.elasticsearch.hadoop.mr.LinkedMapWritable LinkedMapWritable.class, Object.class);
elasticsearchConf.setClass("mapreduce.job.inputformat.class", org.elasticsearch.hadoop.mr.EsInputFormat EsInputFormat.class, InputFormat.class);
如下所示呼叫 Read 轉換
org.elasticsearch.hadoop.mr.EsInputFormat
的 EsInputFormat
鍵類別是 org.apache.hadoop.io.Text
Text
,而其值類別是 org.elasticsearch.hadoop.mr.LinkedMapWritable
LinkedMapWritable
。鍵和值類別都有 Beam 編碼器。
HCatalog - HCatInputFormat
若要使用 HCatalog 讀取資料,請使用 org.apache.hive.hcatalog.mapreduce.HCatInputFormat
,需要設定以下屬性
Configuration hcatConf = new Configuration();
hcatConf.setClass("mapreduce.job.inputformat.class", HCatInputFormat.class, InputFormat.class);
hcatConf.setClass("key.class", LongWritable.class, Object.class);
hcatConf.setClass("value.class", HCatRecord.class, Object.class);
hcatConf.set("hive.metastore.uris", "thrift://metastore-host:port");
org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(hcatConf, "my_database", "my_table", "my_filter");
如下所示呼叫 Read 轉換
Amazon DynamoDB - DynamoDBInputFormat
若要從 Amazon DynamoDB 讀取資料,請使用 org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
。DynamoDBInputFormat 實作較舊的 org.apache.hadoop.mapred.InputFormat
介面,為了使其與使用較新的抽象類別 org.apache.hadoop.mapreduce.InputFormat
的 HadoopFormatIO 相容,需要一個包裝函式 API 作為 HadoopFormatIO 和 DynamoDBInputFormat(或一般實作 org.apache.hadoop.mapred.InputFormat
的任何 InputFormat)之間的介面。以下範例使用一個可用的包裝函式 API - https://github.com/twitter/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/MapReduceInputFormatWrapper.java
Configuration dynamoDBConf = new Configuration();
Job job = Job.getInstance(dynamoDBConf);
com.twitter.elephantbird.mapreduce.input.MapReduceInputFormatWrapper.setInputFormat(org.apache.hadoop.dynamodb.read.DynamoDBInputFormat.class, job);
dynamoDBConf = job.getConfiguration();
dynamoDBConf.setClass("key.class", Text.class, WritableComparable.class);
dynamoDBConf.setClass("value.class", org.apache.hadoop.dynamodb.DynamoDBItemWritable.class, Writable.class);
dynamoDBConf.set("dynamodb.servicename", "dynamodb");
dynamoDBConf.set("dynamodb.input.tableName", "table_name");
dynamoDBConf.set("dynamodb.endpoint", "dynamodb.us-west-1.amazonaws.com");
dynamoDBConf.set("dynamodb.regionid", "us-west-1");
dynamoDBConf.set("dynamodb.throughput.read", "1");
dynamoDBConf.set("dynamodb.throughput.read.percent", "1");
dynamoDBConf.set("dynamodb.version", "2011-12-05");
dynamoDBConf.set(DynamoDBConstants.DYNAMODB_ACCESS_KEY_CONF, "aws_access_key");
dynamoDBConf.set(DynamoDBConstants.DYNAMODB_SECRET_KEY_CONF, "aws_secret_key");
如下所示呼叫 Read 轉換
Apache HBase - TableSnapshotInputFormat
要從 HBase 表格快照讀取資料,請使用 org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
。從表格快照讀取資料會繞過 HBase 區域伺服器,而是直接從檔案系統讀取 HBase 資料檔案。這對於讀取歷史資料或從 HBase 叢集卸載工作等情況非常有用。在某些情況下,這可能比使用 HBaseIO
通過區域伺服器存取內容更快。
可以使用 HBase shell 或程式化方式建立表格快照。
TableSnapshotInputFormat
的配置方式如下:
// Construct a typical HBase scan
Scan scan = new Scan();
scan.setCaching(1000);
scan.setBatch(1000);
scan.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("col_1"));
scan.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("col_2"));
Configuration hbaseConf = HBaseConfiguration.create();
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "zk1:2181");
hbaseConf.set("hbase.rootdir", "/hbase");
hbaseConf.setClass(
"mapreduce.job.inputformat.class", TableSnapshotInputFormat.class, InputFormat.class);
hbaseConf.setClass("key.class", ImmutableBytesWritable.class, Writable.class);
hbaseConf.setClass("value.class", Result.class, Writable.class);
ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
hbaseConf.set(TableInputFormat.SCAN, Base64.encodeBytes(proto.toByteArray()));
// Make use of existing utility methods
Job job = Job.getInstance(hbaseConf); // creates internal clone of hbaseConf
TableSnapshotInputFormat.setInput(job, "my_snapshot", new Path("/tmp/snapshot_restore"));
hbaseConf = job.getConfiguration(); // extract the modified clone
如下所示呼叫 Read 轉換
使用 HadoopFormatIO 寫入
您需要傳遞一個 Hadoop Configuration
,其中包含指定寫入如何發生的參數。Configuration
的許多屬性都是可選的,某些 OutputFormat
類別需要某些屬性,但以下屬性必須為所有 OutputFormat
設定:
mapreduce.job.id
- 寫入任務的識別符。例如:視窗的結束時間戳記。mapreduce.job.outputformat.class
- 用於連接到您選擇的資料接收器的OutputFormat
類別。mapreduce.job.output.key.class
- 傳遞給mapreduce.job.outputformat.class
中OutputFormat
的鍵類別。mapreduce.job.output.value.class
- 傳遞給mapreduce.job.outputformat.class
中OutputFormat
的值類別。mapreduce.job.reduces
- reduce 任務的數量。該值等於將產生的寫入任務的數量。對於Write.PartitionedWriterBuilder#withoutPartitioning()
寫入,此屬性不是必需的。mapreduce.job.partitioner.class
- 將用於在分割區之間分發記錄的 Hadoop 分割器類別。對於Write.PartitionedWriterBuilder#withoutPartitioning()
寫入,此屬性不是必需的。
注意:所有提到的值都有適當的常數。例如:HadoopFormatIO.OUTPUT_FORMAT_CLASS_ATTR
。
例如
Configuration myHadoopConfiguration = new Configuration(false);
// Set Hadoop OutputFormat, key and value class in configuration
myHadoopConfiguration.setClass("mapreduce.job.outputformat.class",
MyDbOutputFormatClass, OutputFormat.class);
myHadoopConfiguration.setClass("mapreduce.job.output.key.class",
MyDbOutputFormatKeyClass, Object.class);
myHadoopConfiguration.setClass("mapreduce.job.output.value.class",
MyDbOutputFormatValueClass, Object.class);
myHadoopConfiguration.setClass("mapreduce.job.partitioner.class",
MyPartitionerClass, Object.class);
myHadoopConfiguration.setInt("mapreduce.job.reduces", 2);
您需要在 Hadoop Configuration
中設定 OutputFormat
的鍵和值類別(即“mapreduce.job.output.key.class”和“mapreduce.job.output.value.class”),它們等於 KeyT
和 ValueT
。如果您設定的 OutputFormat
鍵或值類別與 OutputFormat
的實際鍵或值類別不同,則會拋出 IllegalArgumentException
。
批次寫入
// Data which will we want to write
PCollection<KV<Text, LongWritable>> boundedWordsCount = ...
// Hadoop configuration for write
// We have partitioned write, so Partitioner and reducers count have to be set - see withPartitioning() javadoc
Configuration myHadoopConfiguration = ...
// Path to directory with locks
String locksDirPath = ...;
boundedWordsCount.apply(
"writeBatch",
HadoopFormatIO.<Text, LongWritable>write()
.withConfiguration(myHadoopConfiguration)
.withPartitioning()
.withExternalSynchronization(new HDFSSynchronization(locksDirPath)));
串流寫入
// Data which will we want to write
PCollection<KV<Text, LongWritable>> unboundedWordsCount = ...;
// Transformation which transforms data of one window into one hadoop configuration
PTransform<PCollection<? extends KV<Text, LongWritable>>, PCollectionView<Configuration>>
configTransform = ...;
unboundedWordsCount.apply(
"writeStream",
HadoopFormatIO.<Text, LongWritable>write()
.withConfigurationTransform(configTransform)
.withExternalSynchronization(new HDFSSynchronization(locksDirPath)));
上次更新於 2024/10/31
您是否找到了您要尋找的所有內容?
這些內容是否有用且清楚?您有任何想要更改的地方嗎?請告訴我們!