Hadoop 輸入/輸出格式 IO

重要!先前稱為 HadoopInputFormatIO 的 Hadoop 輸入格式 IO 實作已自 Apache Beam 2.10 起棄用。請使用目前支援 InputFormatOutputFormatHadoopFormatIO

HadoopFormatIO 是一個轉換,用於從任何實作 Hadoop InputFormat 的來源讀取資料,或將資料寫入任何實作 Hadoop OutputFormat 的接收器。例如,Cassandra、Elasticsearch、HBase、Redis、Postgres 等。

HadoopFormatIO 可讓您連線到許多尚未具有 Beam IO 轉換的資料來源/接收器。不過,HadoopFormatIO 在連線到 InputFormatOutputFormat 時必須進行一些效能上的取捨。因此,如果有另一個 Beam IO 轉換專門用於連線到您選擇的資料來源/接收器,建議您使用該轉換。

使用 HadoopFormatIO 讀取

您需要傳遞一個 Hadoop Configuration,其中包含指定讀取方式的參數。Configuration 的許多屬性是選擇性的,有些屬性對於某些 InputFormat 類別是必要的,但所有 InputFormat 類別都必須設定以下屬性

例如

Configuration myHadoopConfiguration = new Configuration(false);
// Set Hadoop InputFormat, key and value class in configuration
myHadoopConfiguration.setClass("mapreduce.job.inputformat.class", InputFormatClass,
  InputFormat.class);
myHadoopConfiguration.setClass("key.class", InputFormatKeyClass, Object.class);
myHadoopConfiguration.setClass("value.class", InputFormatValueClass, Object.class);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

您需要檢查 InputFormat 輸出的 KeyValue 類別是否具有可用的 Beam Coder。如果沒有,您可以使用 withKeyTranslationwithValueTranslation 來指定將這些類別的執行個體轉換為 Beam Coder 支援的另一個類別的方法。這些設定是選擇性的,您不需要為鍵和值都指定轉換。

例如

SimpleFunction<InputFormatKeyClass, MyKeyClass> myOutputKeyType =
new SimpleFunction<InputFormatKeyClass, MyKeyClass>() {
  public MyKeyClass apply(InputFormatKeyClass input) {
  // ...logic to transform InputFormatKeyClass to MyKeyClass
  }
};
SimpleFunction<InputFormatValueClass, MyValueClass> myOutputValueType =
new SimpleFunction<InputFormatValueClass, MyValueClass>() {
  public MyValueClass apply(InputFormatValueClass input) {
  // ...logic to transform InputFormatValueClass to MyValueClass
  }
};
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

僅使用 Hadoop 組態讀取資料。

p.apply("read",
  HadoopFormatIO.<InputFormatKeyClass, InputFormatKeyClass>read()
  .withConfiguration(myHadoopConfiguration);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

使用組態和鍵轉換讀取資料

例如,Beam Coder 不適用於 Key 類別,因此需要鍵轉換。

p.apply("read",
  HadoopFormatIO.<MyKeyClass, InputFormatKeyClass>read()
  .withConfiguration(myHadoopConfiguration)
  .withKeyTranslation(myOutputKeyType);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

使用組態和值轉換讀取資料

例如,Beam Coder 不適用於 Value 類別,因此需要值轉換。

p.apply("read",
  HadoopFormatIO.<InputFormatKeyClass, MyValueClass>read()
  .withConfiguration(myHadoopConfiguration)
  .withValueTranslation(myOutputValueType);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

使用組態、值轉換和鍵轉換讀取資料

例如,Beam 編碼器不適用於 InputFormatKey 類別和 Value 類別,因此需要鍵和值轉換。

p.apply("read",
  HadoopFormatIO.<MyKeyClass, MyValueClass>read()
  .withConfiguration(myHadoopConfiguration)
  .withKeyTranslation(myOutputKeyType)
  .withValueTranslation(myOutputValueType);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

特定 InputFormat 的範例

Cassandra - CqlInputFormat

若要從 Cassandra 讀取資料,請使用 org.apache.cassandra.hadoop.cql3.CqlInputFormat,需要設定以下屬性

Configuration cassandraConf = new Configuration();
cassandraConf.set("cassandra.input.thrift.port", "9160");
cassandraConf.set("cassandra.input.thrift.address", CassandraHostIp);
cassandraConf.set("cassandra.input.partitioner.class", "Murmur3Partitioner");
cassandraConf.set("cassandra.input.keyspace", "myKeySpace");
cassandraConf.set("cassandra.input.columnfamily", "myColumnFamily");
cassandraConf.setClass("key.class", java.lang.Long Long.class, Object.class);
cassandraConf.setClass("value.class", com.datastax.driver.core.Row Row.class, Object.class);
cassandraConf.setClass("mapreduce.job.inputformat.class", org.apache.cassandra.hadoop.cql3.CqlInputFormat CqlInputFormat.class, InputFormat.class);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

如下所示呼叫 Read 轉換

PCollection<KV<Long, String>> cassandraData =
  p.apply("read",
  HadoopFormatIO.<Long, String>read()
  .withConfiguration(cassandraConf)
  .withValueTranslation(cassandraOutputValueType);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

CqlInputFormat 鍵類別是 java.lang.Long Long,它具有 Beam CoderCqlInputFormat 值類別是 com.datastax.driver.core.Row Row,它沒有 Beam Coder。您可以提供自己的轉換方法,而不是撰寫新的編碼器,如下所示

SimpleFunction<Row, String> cassandraOutputValueType = SimpleFunction<Row, String>()
{
  public String apply(Row row) {
    return row.getString('myColName');
  }
};
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

Elasticsearch - EsInputFormat

若要從 Elasticsearch 讀取資料,請使用 EsInputFormat,需要設定以下屬性

Configuration elasticsearchConf = new Configuration();
elasticsearchConf.set("es.nodes", ElasticsearchHostIp);
elasticsearchConf.set("es.port", "9200");
elasticsearchConf.set("es.resource", "ElasticIndexName/ElasticTypeName");
elasticsearchConf.setClass("key.class", org.apache.hadoop.io.Text Text.class, Object.class);
elasticsearchConf.setClass("value.class", org.elasticsearch.hadoop.mr.LinkedMapWritable LinkedMapWritable.class, Object.class);
elasticsearchConf.setClass("mapreduce.job.inputformat.class", org.elasticsearch.hadoop.mr.EsInputFormat EsInputFormat.class, InputFormat.class);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

如下所示呼叫 Read 轉換

PCollection<KV<Text, LinkedMapWritable>> elasticData = p.apply("read",
  HadoopFormatIO.<Text, LinkedMapWritable>read().withConfiguration(elasticsearchConf));
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

org.elasticsearch.hadoop.mr.EsInputFormatEsInputFormat 鍵類別是 org.apache.hadoop.io.Text Text,而其值類別是 org.elasticsearch.hadoop.mr.LinkedMapWritable LinkedMapWritable。鍵和值類別都有 Beam 編碼器。

HCatalog - HCatInputFormat

若要使用 HCatalog 讀取資料,請使用 org.apache.hive.hcatalog.mapreduce.HCatInputFormat,需要設定以下屬性

Configuration hcatConf = new Configuration();
hcatConf.setClass("mapreduce.job.inputformat.class", HCatInputFormat.class, InputFormat.class);
hcatConf.setClass("key.class", LongWritable.class, Object.class);
hcatConf.setClass("value.class", HCatRecord.class, Object.class);
hcatConf.set("hive.metastore.uris", "thrift://metastore-host:port");

org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(hcatConf, "my_database", "my_table", "my_filter");
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

如下所示呼叫 Read 轉換

PCollection<KV<Long, HCatRecord>> hcatData =
  p.apply("read",
  HadoopFormatIO.<Long, HCatRecord>read()
  .withConfiguration(hcatConf);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

Amazon DynamoDB - DynamoDBInputFormat

若要從 Amazon DynamoDB 讀取資料,請使用 org.apache.hadoop.dynamodb.read.DynamoDBInputFormat。DynamoDBInputFormat 實作較舊的 org.apache.hadoop.mapred.InputFormat 介面,為了使其與使用較新的抽象類別 org.apache.hadoop.mapreduce.InputFormat 的 HadoopFormatIO 相容,需要一個包裝函式 API 作為 HadoopFormatIO 和 DynamoDBInputFormat(或一般實作 org.apache.hadoop.mapred.InputFormat 的任何 InputFormat)之間的介面。以下範例使用一個可用的包裝函式 API - https://github.com/twitter/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/MapReduceInputFormatWrapper.java

Configuration dynamoDBConf = new Configuration();
Job job = Job.getInstance(dynamoDBConf);
com.twitter.elephantbird.mapreduce.input.MapReduceInputFormatWrapper.setInputFormat(org.apache.hadoop.dynamodb.read.DynamoDBInputFormat.class, job);
dynamoDBConf = job.getConfiguration();
dynamoDBConf.setClass("key.class", Text.class, WritableComparable.class);
dynamoDBConf.setClass("value.class", org.apache.hadoop.dynamodb.DynamoDBItemWritable.class, Writable.class);
dynamoDBConf.set("dynamodb.servicename", "dynamodb");
dynamoDBConf.set("dynamodb.input.tableName", "table_name");
dynamoDBConf.set("dynamodb.endpoint", "dynamodb.us-west-1.amazonaws.com");
dynamoDBConf.set("dynamodb.regionid", "us-west-1");
dynamoDBConf.set("dynamodb.throughput.read", "1");
dynamoDBConf.set("dynamodb.throughput.read.percent", "1");
dynamoDBConf.set("dynamodb.version", "2011-12-05");
dynamoDBConf.set(DynamoDBConstants.DYNAMODB_ACCESS_KEY_CONF, "aws_access_key");
dynamoDBConf.set(DynamoDBConstants.DYNAMODB_SECRET_KEY_CONF, "aws_secret_key");
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

如下所示呼叫 Read 轉換

PCollection<Text, DynamoDBItemWritable> dynamoDBData =
  p.apply("read",
  HadoopFormatIO.<Text, DynamoDBItemWritable>read()
  .withConfiguration(dynamoDBConf);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

Apache HBase - TableSnapshotInputFormat

要從 HBase 表格快照讀取資料,請使用 org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat。從表格快照讀取資料會繞過 HBase 區域伺服器,而是直接從檔案系統讀取 HBase 資料檔案。這對於讀取歷史資料或從 HBase 叢集卸載工作等情況非常有用。在某些情況下,這可能比使用 HBaseIO 通過區域伺服器存取內容更快。

可以使用 HBase shell 或程式化方式建立表格快照。

try (
    Connection connection = ConnectionFactory.createConnection(hbaseConf);
    Admin admin = connection.getAdmin()
  ) {
  admin.snapshot(
    "my_snaphshot",
    TableName.valueOf("my_table"),
    HBaseProtos.SnapshotDescription.Type.FLUSH);
}
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

TableSnapshotInputFormat 的配置方式如下:

// Construct a typical HBase scan
Scan scan = new Scan();
scan.setCaching(1000);
scan.setBatch(1000);
scan.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("col_1"));
scan.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("col_2"));

Configuration hbaseConf = HBaseConfiguration.create();
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "zk1:2181");
hbaseConf.set("hbase.rootdir", "/hbase");
hbaseConf.setClass(
    "mapreduce.job.inputformat.class", TableSnapshotInputFormat.class, InputFormat.class);
hbaseConf.setClass("key.class", ImmutableBytesWritable.class, Writable.class);
hbaseConf.setClass("value.class", Result.class, Writable.class);
ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
hbaseConf.set(TableInputFormat.SCAN, Base64.encodeBytes(proto.toByteArray()));

// Make use of existing utility methods
Job job = Job.getInstance(hbaseConf); // creates internal clone of hbaseConf
TableSnapshotInputFormat.setInput(job, "my_snapshot", new Path("/tmp/snapshot_restore"));
hbaseConf = job.getConfiguration(); // extract the modified clone
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

如下所示呼叫 Read 轉換

PCollection<ImmutableBytesWritable, Result> hbaseSnapshotData =
  p.apply("read",
  HadoopFormatIO.<ImmutableBytesWritable, Result>read()
  .withConfiguration(hbaseConf);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

使用 HadoopFormatIO 寫入

您需要傳遞一個 Hadoop Configuration,其中包含指定寫入如何發生的參數。Configuration 的許多屬性都是可選的,某些 OutputFormat 類別需要某些屬性,但以下屬性必須為所有 OutputFormat 設定:

注意:所有提到的值都有適當的常數。例如:HadoopFormatIO.OUTPUT_FORMAT_CLASS_ATTR

例如

Configuration myHadoopConfiguration = new Configuration(false);
// Set Hadoop OutputFormat, key and value class in configuration
myHadoopConfiguration.setClass("mapreduce.job.outputformat.class",
   MyDbOutputFormatClass, OutputFormat.class);
myHadoopConfiguration.setClass("mapreduce.job.output.key.class",
   MyDbOutputFormatKeyClass, Object.class);
myHadoopConfiguration.setClass("mapreduce.job.output.value.class",
   MyDbOutputFormatValueClass, Object.class);
myHadoopConfiguration.setClass("mapreduce.job.partitioner.class",
   MyPartitionerClass, Object.class);
myHadoopConfiguration.setInt("mapreduce.job.reduces", 2);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

您需要在 Hadoop Configuration 中設定 OutputFormat 的鍵和值類別(即“mapreduce.job.output.key.class”和“mapreduce.job.output.value.class”),它們等於 KeyTValueT。如果您設定的 OutputFormat 鍵或值類別與 OutputFormat 的實際鍵或值類別不同,則會拋出 IllegalArgumentException

批次寫入

// Data which will we want to write
PCollection<KV<Text, LongWritable>> boundedWordsCount = ...

// Hadoop configuration for write
// We have partitioned write, so Partitioner and reducers count have to be set - see withPartitioning() javadoc
Configuration myHadoopConfiguration = ...
// Path to directory with locks
String locksDirPath = ...;

boundedWordsCount.apply(
    "writeBatch",
    HadoopFormatIO.<Text, LongWritable>write()
        .withConfiguration(myHadoopConfiguration)
        .withPartitioning()
        .withExternalSynchronization(new HDFSSynchronization(locksDirPath)));
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

串流寫入

// Data which will we want to write
PCollection<KV<Text, LongWritable>> unboundedWordsCount = ...;

// Transformation which transforms data of one window into one hadoop configuration
PTransform<PCollection<? extends KV<Text, LongWritable>>, PCollectionView<Configuration>>
  configTransform = ...;

unboundedWordsCount.apply(
  "writeStream",
  HadoopFormatIO.<Text, LongWritable>write()
      .withConfigurationTransform(configTransform)
      .withExternalSynchronization(new HDFSSynchronization(locksDirPath)));
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.