內建輸入/輸出轉換

Web API 輸入/輸出連接器

Beam SDK 包含一個稱為 RequestResponseIO 的內建轉換,以支援使用 Web API (例如 REST 或 gRPC) 進行讀取和寫入。

以下討論重點在 Java SDK。Python 範例將在未來新增;請參閱追蹤器問題:#30422。此外,Go SDK 的支援尚未推出;請參閱追蹤器問題:#30423

RequestResponseIO 功能

此轉換提供的功能包括

本指南目前著重於上述前兩點,即最少程式碼需求和錯誤處理。未來可能會擴展以顯示其他功能的範例。以下提供其他資源的連結。

其他資源

開始之前

若要使用 RequestResponseIO,請將相依性新增至您的 Gradle build.gradle(.kts)Maven pom.xml 檔案。請參閱 Maven Central 以取得可用的版本。

以下顯示一個範例,將 Beam BOM 和相關相依性 (例如 Beam 核心) 新增至您的 build.gradle(.kts) 檔案。

// Apache Beam BOM
// https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-bom
implementation("org.apache.beam:beam-sdks-java-bom:2.60.0")

// Beam Core SDK
// https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-core
implementation("org.apache.beam:beam-sdks-java-core")

// RequestResponseIO dependency
// https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-io-rrio
implementation("org.apache.beam:beam-sdks-java-io-rrio")

或者,使用 Maven,將成品相依性新增至您的 pom.xml 檔案。

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-rrio</artifactId>
    <version>2.60.0</version>
</dependency>

RequestResponseIO 基礎

最少程式碼

從 Web API 讀取或寫入所需的最少程式碼是

  1. Caller 實作。
  2. 實例化 RequestResponseIO

實作 Caller

Caller 只需要覆寫一個方法:call,其目的是與 API 互動,將請求轉換為回應。轉換的 DoFn 會在其 DoFn.ProcessElement 方法中呼叫此方法。轉換會處理其他所有事項,包括重複失敗的請求和指數退避 (以下將進一步討論)。

// MyCaller invokes a Web API with MyRequest and returns the resulting MyResponse.
class MyCaller<MyRequest, MyResponse> implements Caller<MyRequest, MyResponse> {

    @Override
    public MyResponse call(MyRequest request) throws UserCodeExecutionException {

        // Do something with request and return the response.

    }

}

實例化 RequestResponseIO

使用 RequestResponseIO 就如下方所示般簡單。如前所述,它最少需要兩個參數:Caller 和預期回應的 Coder。(注意:如果您不熟悉 Beam Coder 的概念,請參閱關於此主題的Apache Beam 程式設計指南。本指南下方也有範例。)

RequestResponseIO 轉換會傳回一個 Result,其中包含任何失敗和成功回應的 PCollection。在 Beam 中,我們稱之為額外輸出模式,通常需要一些樣板程式碼,但轉換會為您處理。使用轉換,您可以透過 Result::getFailuresResult::getResponses 取得成功和失敗的 PCollection

以下顯示一個簡短的程式碼片段,說明轉換在您的管道中如何運作。

// Step 1. Define the Coder for the response.
Coder<MyResponse> responseCoder = ...

// Step 2. Build the request PCollection.
PCollection<MyRequest> requests = ...

// Step 3. Instantiate the RequestResponseIO with the Caller and Coder and apply it to the request PCollection.
Result<MyResponse> result = requests.apply(RequestResponseIO.of(new MyCaller(), responseCoder));

// Step 4a. Do something with the responses.
result.getResponses().apply( ... );

// Step 4b. Apply failures to a dead letter sink.
result.getFailures().apply( ... );

RequestResponseIO 會處理呼叫每個請求的 Caller 所需的其他所有事項。它不關心您在 Caller 內執行什麼動作,無論您是進行原始 HTTP 呼叫還是使用用戶端程式碼。本指南稍後會討論此設計對於測試的優點。

API 呼叫重複與失敗

如上所述,RequestResponseIO 會傳回一個 Result,其中包含從您的 Caller 產生的成功和失敗 PCollection。本節提供更多關於處理失敗和特定 API 呼叫重複與退避的詳細資訊。

處理失敗

失敗是 ApiIOError PCollection,您可以將其套用至記錄轉換或將錯誤儲存至下游接收器的轉換,以供稍後分析和疑難排解。

由於 ApiIOError 已對應到 Beam Schema,因此它與 Beam 大部分現有的 I/O 連接器相容。(注意:如果您對 Beam Schema 的概念不熟悉,請參閱Beam 程式設計指南)例如,您可以輕鬆地將 ApiIOError 記錄傳送到 BigQuery 進行分析和疑難排解,如下所示,無需先將記錄轉換為 TableRow

static void writeFailuresToBigQuery(
    PCollection<ApiIOError> failures,
    TableReference tableReference,
    BigQueryIO.Write.CreateDisposition createDisposition,
    BigQueryIO.Write.WriteDisposition writeDisposition) {

  // PCollection<ApiIOError> failures = ...
  // TableReference tableReference = ...
  // BigQueryIO.Write.CreateDisposition createDisposition = ...
  // BigQueryIO.Write.WriteDisposition writeDisposition = ...

  failures.apply(
      "Dead letter",
      BigQueryIO.<ApiIOError>write()
          .useBeamSchema()
          .to(tableReference)
          .withCreateDisposition(createDisposition)
          .withWriteDisposition(writeDisposition));
}

API 呼叫重複與退避

在發送到失敗的 PCollection 之前,轉換會對某些錯誤執行重試,並在規定的指數退避後進行。您的 Caller 必須拋出特定的錯誤,以通知轉換執行退避重試。拋出 UserCodeExecutionException 將立即將錯誤發送到 ApiIOError PCollection

Caller 拋出以下錯誤時,RequestResponseIO 將嘗試退避重試:

在達到重試次數的閾值後,錯誤將發送到失敗的 PCollection

測試

由於 RequestResponseIO 不關心您在 Caller 實作中做什麼,這使得一些測試更加方便。您只需實作一個版本的 Caller,根據您的測試邏輯返回回應或拋出例外,而無需依賴某些測試中對真實 API 的直接調用,從而依賴您的外部資源。例如,如果您想測試管道中針對特定回應(例如空記錄)的下游步驟,您可以透過以下方式輕鬆完成。有關測試 Beam 管道的更多資訊,請參閱Beam 程式設計指南

@Test
void givenEmptyResponse_thenExpectSomething() {
    // Test expects PTransform underTest should do something as a result of empty records, for example.
    PTransform<Iterable<String>, ?> underTest = ...

    PCollection<String> requests = pipeline.apply(Create.of("aRequest"));
    IterableCoder<String> coder = IterableCoder.of(StringUtf8Coder.of());
    Result<Iterable<String>> result = requests.apply(RequestResponseIO.of(new MockEmptyIterableResponse()), coder);

    PAssert.that(result.getResponses().apply(underTest)).containsInAnyOrder(...)

    pipeline.run();
}

// MockEmptyIterableResponse simulates when there are no results from the API.
class MockEmptyIterableResponse<String, Iterable<String>> implements Caller<String, Iterable<String>> {
@Override
    public Iterable<String> call(String request) throws UserCodeExecutionException {
        return Collections.emptyList();
    }
}

實務範例

以下顯示了兩個範例,我們將在端對端 Beam 管道中將它們結合在一起。此管道的目標是下載影像並使用 Vertex AI 上的 Gemini 來識別影像內容。

請注意,此範例不會取代我們目前的 AI/ML 解決方案。有關將 Beam 與 AI/ML 搭配使用的更多詳細資訊,請參閱AI/ML 管道入門

直接使用 HTTP 呼叫

我們首先需要下載影像。為此,我們需要對影像 URL 進行 HTTP 呼叫,並將它們的內容發送到 PCollection,以供 Gemini API 使用。此範例本身的價值在於它示範了如何使用 RequestResponseIO 進行原始 HTTP 請求。

定義 Caller

我們實作了 Caller,即 HttpImageClient,它接收一個 ImageRequest 並返回一個 ImageResponse

為了演示目的,此範例使用 KV 來保留返回的 ImageResponse 中包含 KV 的原始 URL。

簡短程式碼片段

以下顯示了 HttpImageClient 的簡短版本,其中顯示了重要的部分。

class HttpImageClient implements Caller<KV<String, ImageRequest>, KV<String, ImageResponse>> {

    private static final HttpRequestFactory REQUEST_FACTORY =
        new NetHttpTransport().createRequestFactory();

    @Override
    public KV<String, ImageResponse> call(KV<String, ImageRequest> requestKV) throws UserCodeExecutionException {

        ImageRequest request = requestKV.getValue();
        GenericUrl url = new GenericUrl(request.getImageUrl());
        HttpRequest imageRequest = REQUEST_FACTORY.buildGetRequest(url);
        HttpResponse response = imageRequest.execute();

        return KV.of(
            requestKV.getKey(),
            ImageResponse
                .builder()
                // Build ImageResponse from HttpResponse
                .build()
        );
    }

}
完整範例

以下顯示了完整實作,說明了根據 HTTP 回應碼拋出各種例外的情況。

/**
 * Implements {@link Caller} to process an {@link ImageRequest} into an {@link ImageResponse} by
 * invoking the HTTP request.
 */
class HttpImageClient implements Caller<KV<String, ImageRequest>, KV<String, ImageResponse>> {

  private static final int STATUS_TOO_MANY_REQUESTS = 429;
  private static final int STATUS_TIMEOUT = 408;
  private static final HttpRequestFactory REQUEST_FACTORY =
      new NetHttpTransport().createRequestFactory();

  static HttpImageClient of() {
    return new HttpImageClient();
  }

  /**
   * Invokes an HTTP Get request from the {@param request}, returning an {@link ImageResponse}
   * containing the image data.
   */
  @Override
  public KV<String, ImageResponse> call(KV<String, ImageRequest> requestKV)
      throws UserCodeExecutionException {

    String key = requestKV.getKey();
    ImageRequest request = requestKV.getValue();
    Preconditions.checkArgument(request != null);
    GenericUrl url = new GenericUrl(request.getImageUrl());

    try {
      HttpRequest imageRequest = REQUEST_FACTORY.buildGetRequest(url);
      HttpResponse response = imageRequest.execute();

      if (response.getStatusCode() >= 500) {
        // Tells transform to repeat the request.
        throw new UserCodeRemoteSystemException(response.getStatusMessage());
      }

      if (response.getStatusCode() >= 400) {

        switch (response.getStatusCode()) {
          case STATUS_TOO_MANY_REQUESTS:
            // Tells transform to repeat the request.
            throw new UserCodeQuotaException(response.getStatusMessage());

          case STATUS_TIMEOUT:
            // Tells transform to repeat the request.
            throw new UserCodeTimeoutException(response.getStatusMessage());

          default:
            // Tells the tranform to emit immediately into failure PCollection.
            throw new UserCodeExecutionException(response.getStatusMessage());
        }
      }

      InputStream is = response.getContent();
      byte[] bytes = ByteStreams.toByteArray(is);

      return KV.of(
          key,
          ImageResponse.builder()
              .setMimeType(request.getMimeType())
              .setData(ByteString.copyFrom(bytes))
              .build());

    } catch (IOException e) {

      // Tells the tranform to emit immediately into failure PCollection.
      throw new UserCodeExecutionException(e);
    }
  }
}

定義請求

ImageRequest 是我們提供給 HttpImageClient 的自訂請求(在上面的範例中定義),以調用 HTTP 呼叫來獲取影像。此範例恰好使用 Google AutoValue,但是您可以像在任何 Beam PCollection 中一樣使用任何自訂的 Serializable Java 類別,包括固有的 Java 類別(例如 StringDouble 等)。為了方便起見,此範例使用 @DefaultSchema(AutoValueSchema.class),允許我們根據其 getter 自動將我們的自訂類型對應到 Beam Schema

/** An HTTP request for an image. */
@DefaultSchema(AutoValueSchema.class)
@AutoValue
abstract class ImageRequest implements Serializable {

  static final TypeDescriptor<ImageRequest> TYPE = TypeDescriptor.of(ImageRequest.class);
  private static final Map<String, String> EXT_MIMETYPE_MAP =
      ImmutableMap.of(
          "jpg", "image/jpeg",
          "jpeg", "image/jpeg",
          "png", "image/png");

  /** Derive the MIME type of the image from the url based on its extension. */
  private static String mimeTypeOf(String url) {
    String ext = FileNameUtils.getExtension(url);
    if (!EXT_MIMETYPE_MAP.containsKey(ext)) {
      throw new IllegalArgumentException(
          String.format("could not map extension to mimetype: ext %s of url: %s", ext, url));
    }
    return EXT_MIMETYPE_MAP.get(ext);
  }

  static Builder builder() {
    return new AutoValue_ImageRequest.Builder();
  }

  /** Build an {@link ImageRequest} from a {@param url}. */
  static ImageRequest of(String url) {
    return builder().setImageUrl(url).setMimeType(mimeTypeOf(url)).build();
  }

  /** The URL of the image request. */
  abstract String getImageUrl();

  /** The MIME type of the image request. */
  abstract String getMimeType();

  @AutoValue.Builder
  abstract static class Builder {
    abstract Builder setImageUrl(String value);

    abstract Builder setMimeType(String value);

    abstract ImageRequest build();
  }
}

定義回應

ImageResponse 是我們從 HttpImageClient 返回的自訂回應(在上面的範例中定義),其中包含影像資料,作為使用影像 URL 呼叫遠端伺服器的結果。同樣,此範例恰好使用 Google AutoValue,但是您可以像在任何 Beam PCollection 中一樣使用任何自訂的 Serializable Java 類別,包括固有的 Java 類別(例如 StringDouble 等)。

/** An HTTP response of an image request. */
@DefaultSchema(AutoValueSchema.class)
@AutoValue
abstract class ImageResponse implements Serializable {

  static Builder builder() {
    return new AutoValue_ImageResponse.Builder();
  }

  /** The MIME type of the response payload. */
  abstract String getMimeType();

  /** The payload of the response containing the image data. */
  abstract ByteString getData();

  @AutoValue.Builder
  abstract static class Builder {
    abstract Builder setMimeType(String value);

    abstract Builder setData(ByteString value);

    abstract ImageResponse build();
  }
}

定義回應編碼器

RequestResponseIO 需要回應的 Coder 作為其第二個必要的參數,如下面的範例所示。有關 Beam Coder 的更多資訊,請參閱Beam 程式設計指南

/** A {@link CustomCoder} of an {@link ImageResponse}. */
class ImageResponseCoder extends CustomCoder<ImageResponse> {
  public static ImageResponseCoder of() {
    return new ImageResponseCoder();
  }

  private static final Coder<byte[]> BYTE_ARRAY_CODER = ByteArrayCoder.of();
  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();

  @Override
  public void encode(ImageResponse value, OutputStream outStream)
      throws CoderException, IOException {
    BYTE_ARRAY_CODER.encode(value.getData().toByteArray(), outStream);
    STRING_CODER.encode(value.getMimeType(), outStream);
  }

  @Override
  public ImageResponse decode(InputStream inStream) throws CoderException, IOException {
    byte[] data = BYTE_ARRAY_CODER.decode(inStream);
    String mimeType = STRING_CODER.decode(inStream);
    return ImageResponse.builder().setData(ByteString.copyFrom(data)).setMimeType(mimeType).build();
  }
}

從 URL 取得影像資料

以下範例說明如何在端對端管道中將所有內容結合在一起。從影像 URL 列表,此範例建立 ImageRequestPCollection,該 PCollection 會應用到使用 HttpImageClient Caller 實作的已實例化 RequestResponseIO

ResultgetFailures getter 存取的任何失敗都會輸出到記錄。如上所述,可以將這些失敗寫入資料庫或檔案系統。

  /** Example demonstrating downloading a list of image URLs using {@link RequestResponseIO}. */
  static void readFromGetEndpointExample(List<String> urls, Pipeline pipeline) {
    //        Pipeline pipeline = Pipeline.create();
    //        List<String> urls = ImmutableList.of(
    //                "https://storage.googleapis.com/generativeai-downloads/images/cake.jpg",
    //                "https://storage.googleapis.com/generativeai-downloads/images/chocolate.png",
    //                "https://storage.googleapis.com/generativeai-downloads/images/croissant.jpg",
    //                "https://storage.googleapis.com/generativeai-downloads/images/dog_form.jpg",
    //                "https://storage.googleapis.com/generativeai-downloads/images/factory.png",
    //                "https://storage.googleapis.com/generativeai-downloads/images/scones.jpg"
    //        );

    // Step 1: Convert the list of URLs to a PCollection of ImageRequests.
    PCollection<KV<String, ImageRequest>> requests = Images.requestsOf(urls, pipeline);

    // Step 2: RequestResponseIO requires a Coder as its second parameter.
    KvCoder<String, ImageResponse> responseCoder =
        KvCoder.of(StringUtf8Coder.of(), ImageResponseCoder.of());

    // Step 3: Process ImageRequests using RequestResponseIO instantiated from the Caller
    // implementation and the expected PCollection response Coder.
    Result<KV<String, ImageResponse>> result =
        requests.apply(
            ImageResponse.class.getSimpleName(),
            RequestResponseIO.of(HttpImageClient.of(), responseCoder));

    // Step 4: Log any failures to stderr.
    result.getFailures().apply("logErrors", Log.errorOf());

    // Step 5: Log output to stdout.
    Images.displayOf(result.getResponses()).apply("logResponses", Log.infoOf());
  }

如下所示的管道輸出顯示了下載影像的摘要、其 URL、MIME 類型和大小。

KV{https://storage.googleapis.com/generativeai-downloads/images/factory.png, mimeType=image/png, size=23130}
KV{https://storage.googleapis.com/generativeai-downloads/images/scones.jpg, mimeType=image/jpeg, size=394671}
KV{https://storage.googleapis.com/generativeai-downloads/images/cake.jpg, mimeType=image/jpeg, size=253809}
KV{https://storage.googleapis.com/generativeai-downloads/images/chocolate.png, mimeType=image/png, size=29375}
KV{https://storage.googleapis.com/generativeai-downloads/images/croissant.jpg, mimeType=image/jpeg, size=207281}
KV{https://storage.googleapis.com/generativeai-downloads/images/dog_form.jpg, mimeType=image/jpeg, size=1121752}

使用 API 用戶端程式碼

最後一個範例示範了直接調用 HTTP 請求。但是,有些 API 服務會提供應在 Caller 實作中使用的用戶端程式碼。在 Beam 中使用用戶端程式碼會帶來獨特的挑戰,即序列化。此外,某些用戶端程式碼需要在設定和拆解方面進行明確的處理。

RequestResponseIO 可以處理稱為 SetupTeardown 的額外介面,以處理這些情況。

SetupTeardown 介面只有兩個方法:setup 和 teardown。

interface SetupTeardown {
    void setup() throws UserCodeExecutionException;
    void teardown() throws UserCodeExecutionException;
}

轉換會在 DoFn 的 @Setup@Teardown 方法中分別呼叫這些 setup 和 teardown 方法。

轉換還會處理退避重試,同樣取決於拋出的例外,如本指南前面所述。

定義具有 SetupTeardown 的 Caller

以下範例改編了 Vertex AI Gemini Java 用戶端,以使用 RequestResponseIO 在 Beam 管道中工作,除了必要的 Caller 之外,還新增了 SetupTeardown 介面的用法。它比上面的簡單 HTTP 範例多了一些樣板。

簡短程式碼片段

以下顯示了一個簡短的程式碼片段,其中顯示了重要的部分。

setup 方法是 GeminiAIClient 實例化 VertexAIGenerativeModel 的地方,最後在 teardown 期間關閉 VertexAI。最後,其 call 方法與上面的 HTTP 範例類似,其中它接收請求、使用它來調用 API 並返回回應。

class GeminiAIClient implements
    Caller<KV<String, GenerateContentRequest>, KV<String, GenerateContentResponse>>,
    SetupTeardown {

    @Override
    public KV<String, GenerateContentResponse> call(KV<String, GenerateContentRequest> requestKV)
    throws UserCodeExecutionException {
        GenerateContentResponse response = client.generateContent(request.getContentsList());
        return KV.of(requestKV.getKey(), response);
    }

    @Override
    public void setup() throws UserCodeExecutionException {
        vertexAI = new VertexAI(getProjectId(), getLocation());
        client = new GenerativeModel(getModelName(), vertexAI);
    }

    @Override
    public void teardown() throws UserCodeExecutionException {
        vertexAI.close();
    }
}
完整範例

以下顯示了完整的範例。此範例的關鍵是 com.google.cloud.vertexai.VertexAIcom.google.cloud.vertexai.generativeai.GenerativeModel 是不可序列化的,因此需要使用 transient 實例化。如果您的 Java 專案未使用 https://checkerframework.org/,則可以忽略 @MonotonicNonNull

/**
 * Example {@link Caller} and {@link SetupTeardown} implementation for use with {@link
 * RequestResponseIO} to process Gemini AI {@link GenerateContentRequest}s into {@link
 * GenerateContentResponse}s.
 */
@AutoValue
abstract class GeminiAIClient
    implements Caller<KV<String, GenerateContentRequest>, KV<String, GenerateContentResponse>>,
        SetupTeardown {

  static Builder builder() {
    return new AutoValue_GeminiAIClient.Builder();
  }

  static final String MODEL_GEMINI_PRO = "gemini-pro";
  static final String MODEL_GEMINI_PRO_VISION = "gemini-pro-vision";

  private transient @MonotonicNonNull VertexAI vertexAI;
  private transient @MonotonicNonNull GenerativeModel client;

  @Override
  public KV<String, GenerateContentResponse> call(KV<String, GenerateContentRequest> requestKV)
      throws UserCodeExecutionException {

    String key = requestKV.getKey();
    GenerateContentRequest request = requestKV.getValue();

    if (request == null) {
      throw new UserCodeExecutionException("request is empty");
    }

    if (request.getContentsList().isEmpty()) {
      throw new UserCodeExecutionException("contentsList is empty");
    }

    try {

      GenerateContentResponse response =
          checkStateNotNull(client).generateContent(request.getContentsList());

      return KV.of(key, response);

    } catch (IOException e) {
      throw new UserCodeExecutionException(e);
    }
  }

  @Override
  public void setup() throws UserCodeExecutionException {
    vertexAI = new VertexAI(getProjectId(), getLocation());
    client = new GenerativeModel(getModelName(), vertexAI);
  }

  @Override
  public void teardown() throws UserCodeExecutionException {
    if (vertexAI != null) {
      vertexAI.close();
    }
  }

  abstract String getModelName();

  abstract String getProjectId();

  abstract String getLocation();

  @AutoValue.Builder
  abstract static class Builder {

    abstract Builder setModelName(String name);

    abstract Optional<String> getModelName();

    abstract Builder setProjectId(String value);

    abstract Builder setLocation(String value);

    abstract GeminiAIClient autoBuild();

    final GeminiAIClient build() {
      if (!getModelName().isPresent()) {
        setModelName(MODEL_GEMINI_PRO);
      }
      return autoBuild();
    }
  }

請 Gemini AI 辨識影像

現在,讓我們將先前獲取影像的範例與此 Gemini AI 用戶端結合在一起,以要求它識別影像。

以下是我們先前看到的內容,但封裝在一個方便的方法中。它會接收 URL 的 List,並返回包含影像資料的 ImageResponsePCollection

  /**
   * Processes a list of raw image URLs into a {@link ImageResponse} {@link PCollection} using
   * {@link RequestResponseIO}. The resulting {@link KV#getKey} is the original image URL.
   */
  static Result<KV<String, ImageResponse>> imagesOf(List<String> urls, Pipeline pipeline) {

    Coder<KV<String, ImageResponse>> kvCoder = KvCoder.of(STRING_CODER, ImageResponseCoder.of());

    return requestsOf(urls, pipeline)
        .apply(
            ImageResponse.class.getSimpleName(),
            RequestResponseIO.of(HttpImageClient.of(), kvCoder));
  }

接下來,我們將 ImageResponse 轉換為 GenerateContentRequestPCollection

    // PCollection<KV<Struct, ImageResponse>> imagesKV = ...

    return imagesKV
        .apply(
            stepName,
            MapElements.into(requestKVType)
                .via(
                    kv -> {
                      String key = kv.getKey();
                      ImageResponse safeResponse = checkStateNotNull(kv.getValue());
                      ByteString data = safeResponse.getData();
                      return buildAIRequest(key, prompt, data, safeResponse.getMimeType());
                    }))
        .setCoder(kvCoder);

最後,我們將 GenerateContentRequestPCollection 應用到使用 GeminiAIClient 實例化的 RequestResponseIO(在上面定義)。請注意,我們使用的是 RequestResponseIO.ofCallerAndSetupTeardown,而不是 RequestResponseIO.ofofCallerAndSetupTeardown 方法只是告訴編譯器,我們正在提供 CallerSetupTeardown 介面的實作。

    //    PCollection<KV<Struct, GenerateContentRequest>> requestKV = ...
    //    GeminiAIClient client =
    //            GeminiAIClient.builder()
    //                    .setProjectId(options.getProject())
    //                    .setLocation(options.getLocation())
    //                    .setModelName(MODEL_GEMINI_PRO_VISION)
    //                    .build();

    return requestKV.apply(
        "Ask Gemini AI", RequestResponseIO.ofCallerAndSetupTeardown(client, responseCoder));

以下顯示了完整的端對端管道。

  /** Demonstrates using Gemini AI to identify a images, acquired from their URLs. */
  static void whatIsThisImage(List<String> urls, GeminiAIOptions options) {
    //        GeminiAIOptions options = PipelineOptionsFactory.create().as(GeminiAIOptions.class);
    //        options.setLocation("us-central1");
    //        options.setProjectId("your-google-cloud-project-id");
    //
    //
    //        List<String> urls = ImmutableList.of(
    //                "https://storage.googleapis.com/generativeai-downloads/images/cake.jpg",
    //                "https://storage.googleapis.com/generativeai-downloads/images/chocolate.png",
    //                "https://storage.googleapis.com/generativeai-downloads/images/croissant.jpg",
    //                "https://storage.googleapis.com/generativeai-downloads/images/dog_form.jpg",
    //                "https://storage.googleapis.com/generativeai-downloads/images/factory.png",
    //                "https://storage.googleapis.com/generativeai-downloads/images/scones.jpg"
    //        );

    // Step 1: Instantiate GeminiAIClient, the Caller and SetupTeardown implementation.
    GeminiAIClient client =
        GeminiAIClient.builder()
            .setProjectId(options.getProject())
            .setLocation(options.getLocation())
            .setModelName(MODEL_GEMINI_PRO_VISION)
            .build();

    Pipeline pipeline = Pipeline.create(options);

    // Step 2: Download the images from the list of urls.
    Result<KV<String, ImageResponse>> getImagesResult = Images.imagesOf(urls, pipeline);

    // Step 3: Log any image download errors.
    getImagesResult.getFailures().apply("Log get images errors", Log.errorOf());

    // Step 4: Build Gemini AI requests from the download image data with the prompt 'What is this
    // picture?'.
    PCollection<KV<String, GenerateContentRequest>> requests =
        buildAIRequests("Identify Image", "What is this picture?", getImagesResult.getResponses());

    // Step 5: Using RequestResponseIO, ask Gemini AI 'What is this picture?' for each downloaded
    // image.
    Result<KV<String, GenerateContentResponse>> responses = askAI(client, requests);

    // Step 6: Log any Gemini AI errors.
    responses.getFailures().apply("Log AI errors", Log.errorOf());

    // Step 7: Log the result of Gemini AI's image recognition.
    responses.getResponses().apply("Log AI answers", Log.infoOf());

    pipeline.run();
  }

以下顯示了執行完整管道的簡短輸出,其中我們看到了 Gemini AI 識別影像的結果。

KV{https://storage.googleapis.com/generativeai-downloads/images/chocolate.png, candidates {
    content {
        role: "model"
        parts {
            text: " This is a picture of a chocolate bar."
    }
}

KV{https://storage.googleapis.com/generativeai-downloads/images/dog_form.jpg, candidates {
    content {
        role: "model"
        parts {
            text: " The picture is a dog walking application form. It has two sections, one for information
                    about the dog and one for information about the owner. The dog\'s name is Fido,
                    he is a Cavoodle, and he is black and tan. He is 3 years old and has a friendly
                    temperament. The owner\'s name is Mark, and his phone number is 0491570006. He would
                    like Fido to be walked once a week on Tuesdays and Thursdays in the morning."
        }
    }
}

KV{https://storage.googleapis.com/generativeai-downloads/images/croissant.jpg
    content {
        role: "model"
        parts {
            text: " The picture shows a basket of croissants. Croissants are a type of pastry that is made
                    from a yeast-based dough that is rolled and folded several times in the rising process.
                    The result is a light, flaky pastry that is often served with butter, jam, or chocolate.
                    Croissants are a popular breakfast food and can also be used as a dessert or snack."
        }
    }
}