2.4. Amazon Simple Queue Service (SQS) への接続

インテグレーションは、 Amazon SQS キューからメッセージを取得したり、Amazon SQS キューにメッセージを送信したりすることができます。これには、Amazon SQS コネクションを作成し、そのコネクションをインテグレーションフローに追加します。詳細については、以下を参照してください。

2.4.1. Amazon SQS コネクションの作成

Amazon SQS コネクションをインテグレーションに追加するには、Amazon SQS コネクションを作成する必要があります。

前提条件

  • AWS アクセスキーが必要です。AWS クレデンシャルの取得 を参照してください。
  • SQS キューが存在するリージョンを知っている必要があります。作成する SQS コネクションは、このキューに対してメッセージを送受信します。

手順

  1. Fuse Online の左パネルで Connections をクリックし、利用可能なコネクションを表示します。
  2. Create Connection をクリックし、Fuse Online コネクターを表示します。
  3. Amazon SQS コネクターをクリックします。
  4. Access Key フィールドに、Amazon アクセスキー ID を入力します。これは、コネクションがアクセスするキューを管理する AWS アカウントのユーザーアクセスキーの一部です。

    コネクションがアクセスするキューが存在しない場合は、Fuse Online がインテグレーションの実行を開始しようとするときに、そのアクセスキーに関連する AWS アカウントを使用してキューを作成します。しかし、他の AWS アカウントにキューが存在する場合は、コネクションはキューを作成できず、インテグレーションを開始できません。

  5. Secret Key フィールドに、指定した Amazon アクセスキー ID の Amazon シークレットアクセスキーを入力します。
  6. Region フィールドに、キューが存在する AWS リージョンを選択します。コネクションがキューを作成する場合は、作成したリージョンにキューが作成されます。
  7. Validate をクリックします。Fuse Online は即座にコネクションの検証を試み、検証の成功または失敗を示すメッセージを表示します。検証に失敗した場合は、必要に応じて設定の詳細を修正し、再度検証を行います。
  8. 検証に成功したら Next をクリックします。
  9. Name フィールドに、このコネクションを別のコネクションと区別するために使用する名前を入力します。たとえば、SQS South を入力します。
  10. Description は任意のフィールドで、このコネクションに関する便利な情報を入力します。たとえば、Sample SQS connection that sends messages to our southern region queue. を入力します。
  11. Save をクリックし、作成したコネクションが利用可能になったことを確認します。上記の例の名前を入力した場合は、インテグレーションに追加できるコネクションとして SQS South が表示されます。

2.4.2. Amazon SQS からメッセージを取得してインテグレーションの実行をトリガー

Amazon SQS キューからメッセージを取得してインテグレーションを開始するには、Amazon SQS コネクションをシンプルなインテグレーションの最初のコネクションとして追加します。

前提条件

  • メッセージの取得元のキューにアクセスするよう設定されている Amazon SQS コネクションが作成済みである必要があります。
  • コネクションがメッセージを取得する SQS キューの名前を知っている必要があります。

手順

  1. Fuse Online パネルの左側にある Integrations をクリックします。
  2. Create Integration をクリックします。
  3. Choose a connection ページで、インテグレーションを開始するために使用する Amazon SQS コネクションをクリックします。
  4. Choose an action ページで、Poll an Amazon SQS Queue を選択し、SQS キューからメッセージを定期的に取得します。
  5. このアクションを設定するには以下を行います。

    1. Delay フィールドで、デフォルトの 500 ミリ秒をポーリング間の経過時間として受け入れます。別のポーリング間隔を指定する場合は、数値を入力し、時間の単位を選択します。
    2. Maximum Objects to Retrieve フィールドに、1 回のポーリング操作で取得可能なメッセージの最大数を入力します。デフォルトは 10 です。

      取得するメッセージの数を無制限にするには、0 または負の整数を指定します。Maximum Objects to Retrieve が無制限の場合、ポーリングは SQS キューのすべてのメッセージを取得します。

    3. Queue Name or Amazon Resource Name フィールドに、メッセージを取得する SQS キューの名前またはキューの ARN を指定します。この名前を持つキューが存在しない場合は、コネクションがこの名前でキューを作成します。必須のフィールドはこのフィールドのみです。
    4. Obtain messages and then delete them from the queue (メッセージを取得した後にキューから削除) を指定するかどうかを示します。メッセージを取得した後にキューから削除するのはデフォルトの動作です。メッセージを取得し、キューにそのメッセージを残す場合のみ、このオプションの選択を解除します。キューに取得済みのメッセージを残すと、再度取得することになります。
    5. Obtain messages and delete the message if it does make it through a Camel filter (メッセージを取得し、Camel フィルターを通過した場合はメッセージを削除) を指定するかどうかを示します。このオプションはデフォルトで選択されます。取得したメッセージをフィルターに通し、キューから削除するかどうかを判断する場合はデフォルトのままにします。Obtain messages and then delete from the queue の選択を解除する必要はありません。Obtain messages and delete the message if it does make it through a Camel filter を選択すると、Fuse Online によって正しい動作が実行されます。
  6. Next をクリックします。

結果

インテグレーションフローの最初にコネクションが表示されます。

次のステップ

Obtain messages and delete the message if it does make it through a Camel filter を選択した場合、この SQS コネクションの後にフィルターステップを追加する必要があります。このフィルターステップは SQS コネクションの直後に追加する必要はありません。すべてのコネクションをインテグレーションに追加した後、フィルターステップを追加します。実行中にフィルターを通過したメッセージがあると、Fuse Online は同じコネクションである SQS の最初のコネクションを使用して、これらのメッセージをキューから削除します。

2.4.3. Amason SQS キューへのメッセージの送信

フローの途中でメッセージを Amazon SQS キューに送信したり、メッセージを Amazon SQS キューに送信してシンプルなインテグレーションを終了することができます。

前提条件

  • メッセージの送信先のキューにアクセスするよう設定されている Amazon SQS コネクションが作成済みである必要があります。
  • コネクションがメッセージを送信する SQS キューの名前を知っている必要があります。
  • Fuse Online はインテグレーションの追加を要求したり、シンプルなインテグレーションの最後のコネクションを選択するよう要求します。

手順

  1. Add to Integration ページで、コネクションの追加先のプラス記号をクリックします。Fuse Online が最後のコネクションを選択するよう要求する場合は、このステップをスキップします。
  2. メッセージの送信に使用する Amazon SQS コネクションをクリックします。
  3. コネクションが実行するアクションを選択します。

    • Send Object を選択すると、1 つのメッセージをキューに送信します。
    • Send Batch Object を選択すると、メッセージのバッチをキューに送信します。
  4. FIFO (先入れ先出し) キューの場合は、Message group ID strategy フィールドで ConstantMessageGroupIdStrategy を使用するか、フィールドをクリックして別のストラテジーを選択します。

    シンプルなキューでは、このパラメーターを設定しても影響はありません。メッセージグループ ID は、メッセージが属するグループを識別します。同じメッセージグループに属するメッセージは、常にメッセージグループの厳密な順序で 1 つずつ処理されます。メッセージグループ ID ストラテジーは、コネクションがグループ ID をメッセージに割り当てる方法を判断します。オプションは次のとおりです。

    • ConstantMessageGroupIdStrategy: コネクションは定数を使用してメッセージをグループ化します。
    • ExchangeIdMessageGroupIdStrategy: コネクションは、各メッセージのエクスチェンジ ID を使用してメッセージをグループ化します。
    • PropertyValueMessageGroupIdStrategy: コネクションは内部プロパティーの値を使用してメッセージをグループ化します。

    FIFO キューは、メッセージが送受信される順番が厳密に保持され、各メッセージが 1 度だけ処理されるように設計されています。FIFO キューの名前には、.fifo が接尾辞として付きます。

  5. Message deduplication ID strategy フィールドの FIFO キューに NullMessageDeduplicationIdStrategy を使用するか、フィールドをクリックして ExchangeIdMessageDeduplicationIdStrategy を選択します。

    シンプルなキューでは、このパラメーターを設定しても影響はありません。FIFO キューでは、コネクションは選択したストラテジーを使用して、重複したメッセージをキューに送信しないようにします。

    • NullMessageDeduplicationIdStrategy はメッセージのボディー部を重複排除要素として使用します。つまり、コネクションはメッセージのボディー部を比較して、重複メッセージを特定します。
    • ExchangeIdMessageDeduplicationIdStrategy はメッセージのエクスチェンジ ID を重複排除要素として使用します。つまり、コネクションはエクスチェンジ ID を比較して重複メッセージを特定します。
  6. Queue Name or Amazon Resource Name フィールドに、メッセージを送信するキューの名前を入力します。キューが存在しない場合は、コネクションが作成します。
  7. Delay フィールドで、0 (遅延なし) のデフォルトを使用するか、数字を指定して遅延を追加します。または、遅延を追加するには数字を指定します。インテグレーションは、この秒数後、メッセージを送信します。これは、メッセージコンシューマーがコネクションが送信するメッセージにすぐに対応できない場合に便利です。
  8. Next をクリックします。

結果

インテグレーションフローで追加した場所にコネクションが表示されます。