less is more

心のフルスタックエンジニア👨‍💻バイブスでコードを書いています🤘

SQSを完全に理解する🧘‍♀️

AWS SQS

Amazon Simple Queue Service とは

f:id:bluepixel:20200412151259p:plain

デフォルト

全部デフォルトでいいならこれだけのテンプレートでキューが作成できる。

AWSTemplateFormatVersion: "2010-09-09"
Resources:
  SampleQueue:
    Type: AWS::SQS::Queue

名前すら指定する必要はない。

名前を指定しなかった場合、一意のIDで初期化される。

例:sample-sqs-SampleQueue-1UGQNZZICLA41

{スタック}-{リソースの論理ID}-{ランダムなID} となる模様。

リソースの戻り値はURLとなっている。 URLの形式は https://sqs.{リージョン}.amazonaws.com/{アカウントID}/{キューの名前} となる。

その他のデフォルト値は以下の通り。

  • キュータイプ:スタンダード(=通常キュー)
  • 配信遅延: 0秒
  • コンテンツに基づく重複削除:なし
  • 可視性タイムアウト:30秒
  • メッセージの保持期間:4日
  • 最大メッセージサイズ:256KB
  • メッセージ受信待機時間:0秒

キューの名前

今度は明示的に名前を指定してみる。

AWSTemplateFormatVersion: "2010-09-09"
Resources:
  SampleQueue:
    Type: AWS::SQS::Queue
    Properties: 
      QueueName: my-sample-q

QueueNameは Update requires: Replacementなので、 すでにあるキューに対して名前の変更をする場合は、 元からあるキューが削除されてから新しいキューが作成されるので注意。

後述するが、FIFOキューを作成する場合は、名前の末尾を .fifo とする必要がある。 自動ではやってくれないので、.fifo を含めた名前を指定すること。

ちなみに通常キューに .fifoサフィックスを指定すると以下のエラーが発生する。

Can only include alphanumeric characters, hyphens, or underscores. 1 to 80 in length (Service: AmazonSQS; Status Code: 400; Error Code: InvalidParameterValue; Request ID: 3f988dc0-e1f9-5a41-8804-db35814c4035)

キューができたので試しにメッセージを出し入れしてみる。

$ aws sqs send-message --queue-url $QUEUE_URL --message-body "Hello world"
{
    "MD5OfMessageBody": "3e25960a79dbc69b674cd4ec67a72c62",
    "MessageId": "a90f375d-b572-4522-a480-d4d9ec5cd80a"
}

メッセージIDが返ってきたので、ちゃんとエンキューできている。

今度はデキュー。

といっても通常キューなので、順番に取り出せるわけではない。

$ aws sqs receive-message --queue-url $QUEUE_URL
{
    "Messages": [
        {
            "MessageId": "a90f375d-b572-4522-a480-d4d9ec5cd80a",
            "ReceiptHandle": "AQEBhAMtoAU5oNgqMj9EHflyKmcGkdpR/ln/o/bxIF5KMtYOC4eEMSdlsGfVW7fFzB8yddX6vPJnmB79OXcXVUtCoJBIyU8cX2s6HiEcspu85pyYTmpZlNXoUuEoV7IMZ77QamdpOafm8i6qoTTbJfgYNW7sDMoMaV0ZjDpaMZ8mdJ8EvEhtQW1hqhaP9MxLjVA02U4/tUwm5LD9yu9hQNeVm1B6E46rbZnAG1j28+NSm02NwpcjAnwYiybSYcLKzOjA51PplU678W4L+9C+fC+zYacTOO3jGFGEnx8NY5lfH+fBVZY7k9Tn//A3gtKb1Whhbl7H4I8x/7b+Xhc+f+uqIRrVTi8woHxuevKAINn7POyXlar98xtsGW67T0T7JZCj+G9FZURSmjfvxIsGDkv/KQ==",
            "MD5OfBody": "3e25960a79dbc69b674cd4ec67a72c62",
            "Body": "Hello world"
        }
    ]
}

次はメッセージの削除。

放っておけばメッセージの保持期間が過ぎた後に自動的に削除されるが、その間コンシューマーから受信され続けることになり、二重に処理が行われてしまうので、処理が正常に終了したら必ず明示的に削除すること。

削除に使うのは、先のレスポンスに含まれている ReceiptHandle

これはユニークな値だが、同じメッセージでも受信するたびに値が変わる。

The ReceiptHandle is associated with a specific instance of receiving a message. If you receive a message more than once, the ReceiptHandle is different each time you receive a message.

メッセージの削除には最新の受信ハンドルを使わなければいけない。 古い受信ハンドルを使った場合、リクエストは成功するが、実際メッセージが削除されない可能性があることに注意。

When you use the DeleteMessage action, you must provide the most recently received ReceiptHandle for the message (otherwise, the request succeeds, but the message might not be deleted).

また、後述の 可視性タイムアウト の時間内に削除を行わないと、受信ハンドルは無効になる。再度メッセージを受信して、取得した新しい受信ハンドルを使って削除リクエストを行う。

$ aws sqs delete-message --queue-url $QUEUE_URL --receipt-handle AQEBhAMtoAU5oNgqMj9EHflyKmcGkdpR/ln/o/bxIF5KMtYOC4eEMSdlsGfVW7fFzB8yddX6vPJnmB79OXcXVUtCoJBIyU8cX2s6HiEcspu85pyYTmpZlNXoUuEoV7IMZ77QamdpOafm8i6qoTTbJfgYNW7sDMoMaV0ZjDpaMZ8mdJ8EvEhtQW1hqhaP9MxLjVA02U4/tUwm5LD9yu9hQNeVm1B6E46rbZnAG1j28+NSm02NwpcjAnwYiybSYcLKzOjA51PplU678W4L+9C+fC+zYacTOO3jGFGEnx8NY5lfH+fBVZY7k9Tn//A3gtKb1Whhbl7H4I8x/7b+Xhc+f+uqIRrVTi8woHxuevKAINn7POyXlar98xtsGW67T0T7JZCj+G9FZURSmjfvxIsGDkv/KQ==

削除リクエストには何もレスポンスはない。

ちなみに削除後でも、少しの間削除済みのはずのメッセージを受信することが稀にある。

これはSQS側の冗長化に起因する問題で、全てのサーバーから削除されるまでに若干の時間がかかるため。

SQSを使うアプリケーションの設計では、常に冪等性を意識する必要がある。

You should ensure that your application is idempotent, so that receiving a message more than once does not cause issues.

次に、キューに設定できる各種オプションについて見ていく。

配信遅延

メッセージがキューに入ってから、コンシューマーが受信できる状態になるまでの猶予。

0〜900秒(15分)の間で指定できる。

デフォルトでは0秒なので即時に受信可能になるが、60秒に設定した場合、59秒間は受信のリクエストを送っても何も返ってこない。

AWSTemplateFormatVersion: "2010-09-09"
Resources:
  SampleQueue:
    Type: AWS::SQS::Queue
    Properties: 
      DelaySeconds: 60

使い道はロールバックのための猶予とかだろうか🤔

例えば複数のシステム処理を行う必要がある業務で、アプリケーションがマイクロサービスに分かれていてトランザクション管理が難しい場合。全ての処理が成功してから行いたい処理をキューに入れておいて、もし何かが失敗したらキューを削除してロールバックするとか。

即時で処理することに問題がない場合は基本0秒でいいと思います。

可視性タイムアウト (visibility timeout)

メッセージが一時的に見えなくなる時間。

0~43,200 秒 (12 時間) の間で指定できる。デフォルトで30秒。

あるコンシューマーがメッセージ受信したら、この秒数の間は、他のコンシューマーはこのメッセージを受信できない。

AWSTemplateFormatVersion: "2010-09-09"
Resources:
  SampleQueue:
    Type: AWS::SQS::Queue
    Properties: 
      VisibilityTimeout: 15

これは、1つのメッセージが2回以上配信されるのを防ぐための機構。(が、保証はされず、稀に起こることもある。SQSの仕様上これは避けられない。)

コンシューマーは、この可視性タイムアウトの間に処理を完了して、メッセージを削除する必要がある。なので、処理に最大どれくらいかかるのかを見積もって時間を設定する。

削除前にタイムアウトが来たら、メッセージは再び受信可能な状態になる。

可視性タイムアウトはキュー全体で一つの値を設定するのだがそれ以上に処理が長引きそうな場合、個別のメッセージごとに延長することもできる。

タイムアウトになる前に受信ハンドルを使って、任意のタイムアウト値を新しく設定する。

#! /bin/bash
aws sqs receive-message --queue-url $QUEUE_URL > ./_tmp.json
receipt_handle=`cat ./_tmp.json | jq -r '.Messages[0].ReceiptHandle'`
message_body=`cat ./_tmp.json | jq -r '.Messages[0].Body'`
echo $receipt_handle
echo $message_body
sleep 20s
aws sqs change-message-visibility --queue-url $QUEUE_URL --receipt-handle $receipt_handle --visibility-timeout 300

新しいタイムアウト値は設定が完了してからの計測となり、すでに経過した時間は関係ない。

つまり、デフォルト30秒のタイムアウトで、20秒が経過してから新たに60秒のタイムアウト値を設定した場合、そこからさらに60秒が経過したらタイムアウトとなる。すでに経過した20秒はカウントされない。

この新たなタイムアウト値は、そのメッセージが不可視状態の間のみ適用される。 例えば延長後にタイムアウトし、再度そのメッセージを受信した時は、キューのデフォルトの可視性タイムアウト値が設定されている。

ちなみに、 change-message-visibility で0秒を設定した場合は、即時で可視状態になる。

メッセージ保持期間

メッセージがキューから削除されるまでの期間。

明示的に削除しない場合は、当該期間の経過後に自動で削除される。

60 ~1,209,600秒 (14 日) の間を指定。デフォルト値は 345,600 秒 (4 日) 。

AWSTemplateFormatVersion: "2010-09-09"
Resources:
  SampleQueue:
    Type: AWS::SQS::Queue
    Properties: 
      MessageRetentionPeriod: 300

この期間は、後述のデッドレターキューの保持期間にも直接影響する。 詳細は後述。

最大メッセージサイズ

メッセージに含められるデータの最大値。

1,024 バイト (1 KiB) ~ 262,144 (256 KiB) の間で指定。デフォルトは262,144 (256 KiB)。

わざわざ小さく制限するユースケースが思いつかない🤔

Resources:
  SampleQueue:
    Type: AWS::SQS::Queue
    Properties: 
      MaximumMessageSize: 15

最大値以上のサイズのメッセージを送ろうとすると以下のエラーが発生する

print "a" * 15361
$ aws sqs send-message --queue-url $QUEUE_URL--message-body $(ruby tmp_ruby.rb)

An error occurred (InvalidParameterValue) when calling the SendMessage operation: One or more parameters are invalid. Reason: Message must be shorter than 15360 bytes.

ロングポーリング

ReceiveMessageWaitTimeSeconds で指定する値が0の場合を ショートポーリング、1以上の場合を ロングポーリング と呼ぶ。

デフォルトは0秒なのでショートポーリング。最大20秒まで指定可能。

ショートポーリングの場合、キューに1つも受信可能なメッセージがない時は、コンシューマがメッセージの受信リクエストを送ると即時に空のレスポンスが返されるが、ロングポーリングの場合は、指定の秒数だけ待機する。

Resources:
  SampleQueue:
    Type: AWS::SQS::Queue
    Properties: 
      ReceiveMessageWaitTimeSeconds: 20

リクエスト数が大幅に削減できるので、基本的にはロングポーリングが望ましい。 AWS側も特に理由がなければロングポーリングを使うことを推奨している。

利用可能なメッセージが現れた場合は、即時にコンシューマに送信される。 例えば20秒の待機中、10秒経過後にメッセージが利用可能になった場合、 残りの10秒を待機することなくメッセージを受信するようになっている。

デッドレターキュー(Redrive policy

メッセージの処理は失敗する可能性があります。 アプリケーション側のバグが原因のこともあれば、ハードウェアやネットワークの問題でメッセージが破損・消失することもありえます。

そのためのエラーの記録、ハンドリングに使えるのがこれ。

処理が指定の回数分失敗したメッセージを、別のキューに移動することができる。

元のキューを ソースキュー、移動する先のキューを デッドレターキュー と呼ぶ。

AWSTemplateFormatVersion: "2010-09-09"
Resources:
  SampleQueue:
    Type: AWS::SQS::Queue
    Properties: 
      RedrivePolicy:
        deadLetterTargetArn: !GetAtt SampleDeadQueue.Arn
        maxReceiveCount: 3
  SampleDeadQueue:
    Type: AWS::SQS::Queue
    Properties: 

※デッドレターキューはソースキューより先に作成する必要があるが、!GetAtt で参照すれば依存関係をAWS側で勝手に解決してくれる。

マネジメントコンソールでは、再処理ポリシー の欄になる。 f:id:bluepixel:20200412150050p:plain

取得して削除せずに放置 => タイムアウト を3回繰り返すと、デッドレターキューに移動される。エラーハンドリングを行うコンシューマーを用意して、デッドレターキューをサブスクライブするのもよいし、CloudWatch Alarmを設定して検知だけするという使い方もできる。

通常キューは、デッドレターキューもまた通常キューで作成する必要がある。FIFOキューも同様にFIFOキューで作る。また、リージョンも同じ場所に置かなければならない。

最後に注意点として、デッドレターキューのメッセージの保持期限について気をつけるべきことがある。

期限のスタートは、ソースキューにメッセージが保存されたタイミングのタイムスタンプとなる。両方のキューの保持期限が4日だとして、1日経ってからデッドレターキューに移動した場合、残りの有効期限は、移動してから4日ではなく、3日となる。そのため、デッドレターキューの方の期限をソースキューより長めにするのが推奨。

CloudWatch Alarmを使ったポイズンピルメッセージ検知のサンプル

ポイズンピルメッセージ とは、処理できないメッセージのこと。

AWSTemplateFormatVersion: "2010-09-09"
Parameters:
  AlarmEmail:
    Type: String
    Description: alarm email address
Resources:
  SampleQueue:
    Type: AWS::SQS::Queue
    Properties: 
      QueueName: my-sample-q
      RedrivePolicy:
        deadLetterTargetArn: !GetAtt SampleDeadQueue.Arn
        maxReceiveCount: 3
  SampleDeadQueue:
    Type: AWS::SQS::Queue
    Properties: 
      QueueName: my-sample-dead-q
  AlarmTopic: 
    Type: AWS::SNS::Topic
    Properties: 
      TopicName: my-sample-queue-topic
      Subscription: 
        - Endpoint: !Ref AlarmEmail
          Protocol: email
  Alarm: 
    Type: AWS::CloudWatch::Alarm
    Properties: 
      AlarmDescription: dead letter queue alarm sample
      Namespace: AWS/SQS
      MetricName: ApproximateNumberOfMessagesVisible
      Dimensions: 
        - Name: QueueName
          Value: !GetAtt SampleDeadQueue.QueueName
      Statistic: Sum
      Period: 60
      EvaluationPeriods: 1
      Threshold: 3
      ComparisonOperator: GreaterThanThreshold
      AlarmActions: 
        - !Ref AlarmTopic
      InsufficientDataActions: 
        - !Ref AlarmTopic

60秒の評価期間の間に、デッドレターキューに可視状態のメッセージが4つ以上見つかれば、指定のメールアドレスにアラートを飛ばす。

FIFOキュー

だいたい通常キューについては説明できたので、次はFIFOキューの使い方。

Resources:
  SampleQueue:
    Type: AWS::SQS::Queue
    Properties: 
      QueueName: my-sample-q.fifo
      FifoQueue: true

キュー名は、 .fifo で終える必要がある。

FifoQueue: true なのに名前が *.fifo ではない場合は以下のエラーになる。

The name of a FIFO queue can only include alphanumeric characters, hyphens, or underscores, must end with .fifo suffix and be 1 to 80 in length. (Service: AmazonSQS; Status Code: 400; Error Code: InvalidParameterValue; Request ID: 0b04da81-c756-5ed6-8458-8c77dfc352f2)

また、通常キューをFIFOキューに変更したり、FIFOキューを通常キューに変更することはできない。CloudFormationを使う場合は、キュー名を指定しているかどうかでこの動作は異なる。

  • 名前を未指定で作成していた場合 => 可能。既存のキューを削除して新しくキューを作成する物理リソースの置換となる。
  • 名前を指定して作成していた場合 => 不可能。キュータイプの変換はリソースの置換が必要な操作のため。別のスタックとして作成する必要がある。

名前を指定すると、このリソースの置換が必要な更新はできません。中断が不要であるか、一定の中断が必要な更新であれば、行うことができます。リソースを置き換える必要がある場合は、新しい名前を指定します。

FIFOキューは、通常キューができることを全てサポートしている。 以下から、FIFOキュー特有の性質・機能を確認していく。

スループット

通常キューよりは制限される。

APIアクションごとにクォータが設けられており、メッセージの送信/受信/削除それぞれ、1秒あたり最大300件のメッセージを処理できる。ただし、APIバッチ処理をサポートしているので(1回のリクエストあたり最大10件のメッセージ)、理論上は最大3000件を処理できる。

(※受信可能なメッセージが10件以上あり、10件取得のバッチリクエストを送った場合でも、取得されるメッセージ数はそれ以下になる可能性があるので、あくまで全てうまくリクエストできた場合の理論値。)

メッセージの順序

通常キューではベストエフォート型、つまり、「なるべく送信された順でメッセージを返すようにするけど保証はできないよ」というポリシーなのに対して、FIFOキューは、順番を厳密に保証する。送信された順番にキューにメッセージを保存し、受信リクエストが来た時は、必ず保存された順番通りにメッセージを返却する。

重複の排除

通常キューでは、同じメッセージが2回以上処理されることがあるが、 FIFOキューでは常に1回限りの配信を保証する。

動作確認

--message-deduplication-id--message-group-id 一旦気にしなくてよい。

#!/bin/sh

for i in `seq 10`
do
  aws sqs send-message --queue-url $QUEUE_URL --message-body "i: $i" --message-deduplication-id $i --message-group-id "hoge"
done
$ sh send_message_fifo.sh
{
    "MD5OfMessageBody": "342fa06dbd9a92acf61e543c3263597d",
    "MessageId": "c0cfce3b-228e-4d08-b4c9-5014e2ed748d",
    "SequenceNumber": "18852554228863727616"
}
{
    "MD5OfMessageBody": "d59f0305126b908a0759f4afba832d3d",
    "MessageId": "8e4a6141-3dd4-43c7-9653-ccbe488756a0",
    "SequenceNumber": "18852554229023727616"
}
{
    "MD5OfMessageBody": "0ae31233b58dce299941dfec44f9e974",
    "MessageId": "814e3277-a99b-475c-ab50-899e6a49511e",
    "SequenceNumber": "18852554229180911872"
}
{
    "MD5OfMessageBody": "b4c9b75a632f11b1b42ea5307e5147c8",
    "MessageId": "d95d0ab1-5986-4e97-bf88-ae7e67266a3b",
    "SequenceNumber": "18852554229334511616"
}
{
    "MD5OfMessageBody": "ed07616b6de421eee6aba693ffc86e8f",
    "MessageId": "61f35be0-f323-4739-aa4f-41284132ed2b",
    "SequenceNumber": "18852554229485040384"
}
{
    "MD5OfMessageBody": "8a32c5bec4a80d60b5829eec83f2bedc",
    "MessageId": "ef91b0f6-e677-43a8-b166-6789271701dd",
    "SequenceNumber": "18852554229636592128"
}
{
    "MD5OfMessageBody": "87eb3aef4e0dfb7cdfbdf35c3c40a6d9",
    "MessageId": "8bde6af3-9f68-409b-8f59-82b8c8fafe16",
    "SequenceNumber": "18852554229785327872"
}
{
    "MD5OfMessageBody": "11f1baece959c0e5461da92542d33111",
    "MessageId": "6d797ebd-804a-4bbe-a140-4f5b928c0be1",
    "SequenceNumber": "18852554229937648128"
}
{
    "MD5OfMessageBody": "e4b31e207828c82c6581ccc20daa1c9a",
    "MessageId": "5e642d2c-be26-411e-8eab-87a40676a7a9",
    "SequenceNumber": "18852554230094319616"
}
{
    "MD5OfMessageBody": "aade5c894d4283088c7a1cb3194dbdbb",
    "MessageId": "f32eb109-4f9a-422d-be7d-34ab8dfdc1c4",
    "SequenceNumber": "18852554230247663616"
}

バッチで10個のメッセージを取得する。

$ aws sqs receive-message --queue-url $QUEUE_URL --max-number-of-messages 10
{
    "Messages": [
        {
            "MessageId": "c0cfce3b-228e-4d08-b4c9-5014e2ed748d",
            "ReceiptHandle": "AQEBiS3SLNrDlVPxPH8qqSpJePrhECYM2JpV+kC7uZdCBFv5n8OiE2PK89eL4EvkRhXefeoUD3gAR3ge18OIfdnTzILMgaRGojKpolYP5m1L373sakR5CnSmATj6cyYqPD2aIxLXT3aQThM4bzwkQwqcibG5OH2pGwbmCnwSEVpLuYmifV5inRFGtlahZ1w2xVzwYu3sEjTUfxyXxJm5XCxCTEDwv1gqayLzQLURh8q2K62/WcojEG4PDP6O3dmQji7j1FvYuTfb5Rx63pGefaUz5GzwQoG4UJ0GGyXWYxAumco=",
            "MD5OfBody": "342fa06dbd9a92acf61e543c3263597d",
            "Body": "i: 1"
        },
        {
            "MessageId": "8e4a6141-3dd4-43c7-9653-ccbe488756a0",
            "ReceiptHandle": "AQEBabOhZG+mHmvWJMrGzhhN4kxjNRbDmwfDKni9484/WXQWmzIh+scve3OHMltWlBr47KuKurFrngxJKb3VRppuzk7D+vRj3KfFUFGWJ0khBx+UU/YJdYB3HLh6ZwesUdjTLFpDAoLTW8uDAAQ+04EtJtafMWC8rSKUB1WdGKRRfEggtiJYLj0S0ROix5Bf7CS8xK9crQXjo+DSCxpU1dt3YWOslabxBRhfU3vt94Qp1NZgrIyQwc5Ow+LWBqZCWWxbGH3OurLAI0N/yZ2s8dhwab7bh0ncL97Rwgpmvvzkl0k=",
            "MD5OfBody": "d59f0305126b908a0759f4afba832d3d",
            "Body": "i: 2"
        },
        {
            "MessageId": "814e3277-a99b-475c-ab50-899e6a49511e",
            "ReceiptHandle": "AQEBshvrQS16V6e4nKPVeclmCU07YbEaXu81nJNeYtbrsXk/7qWkRqFPn2xu4H6n7iYQSgf7yR9/fRtFUl9TBqbkc40e2A4zmWGi/8/CU0GC4uq+wRDrr0RPkWkXujXspZanhMsybJlA4mQiAwhSxEqcmiBLj/Us3pV2ljf6zPToP+kBuB7afbCg+q8CixsiryoRDI2vlZ2jvR70B1CGcnmInJ5tfv8Vk1PSP0poQ0aLbUifj11bJM3JlNsEtF4AeI3msPB+8tmTXka+aJJVlNZswxu3Q7UrlL1PPkY83AlIFro=",
            "MD5OfBody": "0ae31233b58dce299941dfec44f9e974",
            "Body": "i: 3"
        },
        {
            "MessageId": "d95d0ab1-5986-4e97-bf88-ae7e67266a3b",
            "ReceiptHandle": "AQEBdDbCRzqsC4lyAEGBtZk1zgecol4wzMdmyf/dulZvNeBCeamE2e9T1JyXzqBvq0gze8w224eI3Pm5G2wC2HddpD8h4xNb5764e0YJDWpO+JCLS70LHEUY/dfAMIKcoyJOjvmJJuPrYHvdJ/4iAQ/gjWyt0egX0xw6fF2S6FNtbBY2cdy1jizYIT0VqFKTgh8dKSmNY2lGTLFPILDZfctuBA4GlaRlfD8kgDvWLuvAKiLe7KWllY4phNEJyBGDjro6ZByJpRfg7pTfJdbNGsRh/zx3sWJhlUL/g7zuqTOUV6Y=",
            "MD5OfBody": "b4c9b75a632f11b1b42ea5307e5147c8",
            "Body": "i: 4"
        },
        {
            "MessageId": "61f35be0-f323-4739-aa4f-41284132ed2b",
            "ReceiptHandle": "AQEBYqg7HxFHrllSRcw21wERy5qx7wy2okZYubQIGkizehF5AAVZvSi5v3kVCstxpFAiRg+Q5s1G7CRtDrtkDrLkjGiXbIptTzY+J7XQdOV8U8ejhaJ8AiqzT/grQiNgjSHpIqSQ8giOn8l5vo3dcOLqRneUBnTnRnTSydhcXuayXMv9/qU+emoDwNW2HetGORL6PlZLfYPwSDPJHzyY0coZ/vH6QQExBBb7mH4WRdyhrwMvORVD+ZujXtYzK9Tot3X5ExU6XE7ERAxuhYxaockq2Z4URaSmEYfPtITGldUMKh0=",
            "MD5OfBody": "ed07616b6de421eee6aba693ffc86e8f",
            "Body": "i: 5"
        },
        {
            "MessageId": "ef91b0f6-e677-43a8-b166-6789271701dd",
            "ReceiptHandle": "AQEBkHUPlArNOSgZcDW0WoP9IOcoIN2z0Lx4k941vU1EiS4ibgn9zhdKh9VY4g6eeVdyQkCNCXw8aDNsk0uMCE8YQsaqOwGjM0r7bawiC97Fo5SvcjDxMHzEPJtetzKL4zm5DvpD0f+EEDUmVgPcYFoJcC95QrOfUC7elXcis2Sd6aDj0/fZB4ZpPsSdaiQKkVZBHAAh+5msTajHbel66ml8oALstgO9uXxLP7HWfdQJO4Rxii+3LtuU8kk16IdYFKp1y1O8iF9Qv5kl08w+aM3rF2GCGFmFkOob/lCV7DnSBVE=",
            "MD5OfBody": "8a32c5bec4a80d60b5829eec83f2bedc",
            "Body": "i: 6"
        },
        {
            "MessageId": "8bde6af3-9f68-409b-8f59-82b8c8fafe16",
            "ReceiptHandle": "AQEBx0UQN9I+K45q4UHzAjoucd7GS2TBxSen9CRgD75EXpA0ymgG3WFEfkHg5kvnbsiF0EwIy+XCnsE0N2AwYkodvSdmX8G0y4PFRc06hWzkyAXSj6fjxv6x4C3PB5EcMccuKaRsNBfoXzr2aNRwZmKueE8pV485BEoi0XGn0qCHhZLGBvn2jVtdZiE2+kC2rbSk2fkvwtu4/9T5pR3OdVKz2D2Pqn7VoAcZptdVdN7WX4+fEmzX/+UOySvgVm1kU3rCecrmPQp3E8m9oj4H2xr3NZCEyqybMfuguC//Si6Vt9o=",
            "MD5OfBody": "87eb3aef4e0dfb7cdfbdf35c3c40a6d9",
            "Body": "i: 7"
        },
        {
            "MessageId": "6d797ebd-804a-4bbe-a140-4f5b928c0be1",
            "ReceiptHandle": "AQEB3ICr4KzT9Y1kzv0pDoH9MW8ZTqGkupLwS8pt7BOJgbjbzXVyLuG1kq/c6l2fcT8i3yi+cgR2e5XNcQS0y66E5SiD5nBkP4V9CF6MWDFFe/2B03KivdNG6koNwyf18XrMXg0yILlCptedxj9ZEwF3OMNt/WL0zXdN8NCIrzXOTYXwTHNVptgjp3/y9jhwEU5jsvvNdeJgJ8llZbzH8kTUTMYtnQ5cAqL1TT6MUxYK+m3hnz3IRSAZH4Qsqkp1Mhc25QUJ0DGANXsPW7477P/zO5YknPsgK/Lr7wOjblwwVJM=",
            "MD5OfBody": "11f1baece959c0e5461da92542d33111",
            "Body": "i: 8"
        },
        {
            "MessageId": "5e642d2c-be26-411e-8eab-87a40676a7a9",
            "ReceiptHandle": "AQEBkEha4icxngraR2kDc5Q1s+LVBg/ewbS75Q6aHk7ugOxiETeiLtcyq9QgagtMvWkxxrq1FIBYQ9juCCKYHouqw5OMuBKSbcVBvwi+A2fgQQB45K4Hm7toJWmiT/nnKYuXyicXyQWZbeW0U2QXVusKByRB3+pGwf5VWTzkb93XOYQStlxnhyd3ZPzkTSmKhYkWPh2FAqYa5Eyo2IA6mKOmrDT+B3X999gA0ZRmM8wRJVWblJgdgh2stwzD8QZhivxCBU8YtvQyhaOuniGbCeUmWerX+4KUXYRtIfibdWh+0ZA=",
            "MD5OfBody": "e4b31e207828c82c6581ccc20daa1c9a",
            "Body": "i: 9"
        },
        {
            "MessageId": "f32eb109-4f9a-422d-be7d-34ab8dfdc1c4",
            "ReceiptHandle": "AQEBcVKdsyQr2QMJYhCOBECe7NnE9qb/5EM9V6NjwXZaspZvc7Zu7/YwnGRKSHCAJcpjjgAYNecttG9qsa4hWy4SkbduGxbZWXDFMvX48lQwJhdp8eP9z/SkunFspZLsOAeRSgRHUV0UBCKdSdlzUFnJfXkGxlHdx1v000d2eo2vvmsviTX/Qq0tHz/NbLmVFc3p6CGQRicwKV8d/OVhHH+ysrqEIV7CFk5mk7HAHUgZSd6DP5lyaUCPUQpASQi48l7AGUzh/47DJL1djrUFpNnazO1i6YeUMOXtCaaOLzb+BpA=",
            "MD5OfBody": "aade5c894d4283088c7a1cb3194dbdbb",
            "Body": "i: 10"
        }
    ]
}

順番通りに取得できている。

メッセージグループID(--message-group-id

メッセージをグルーピングするために、任意の値を設定する。FIFOキューでメッセージを送信する際には、必須となる。

FIFOキューが順番通りにメッセージを返却することについては先述したが、場合によっては、その順番をもう少し柔軟に制御したいケースもあると思う。そんな時にこのグループIDが使える。

先ほどのスクリプトを少し修正して、奇数には odd、偶数には even というグループIDを付与してみる。

#!/bin/sh

for i in `seq 10`
do
  [ $(($i % 2)) = 0 ] && groupId="even" || groupId="odd"
  aws sqs send-message --queue-url $QUEUE_URL --message-body "i: $i" --message-deduplication-id $i --message-group-id $groupId
done

バッチで10件取得した結果がこれ。

{
    "Messages": [
        {
            "MessageId": "d32ecd49-6bbc-47b4-b632-4ccc18bbc822",
            "ReceiptHandle": "AQEBYImyhUSbXC6KbcaIWnQOcspb1dvYktIu7AoUrOIWcNtKWROW0FhE2FQ3Bo0GPaDgJb66T/1PLycUfVmWngts5fbC27zIz0N7NarstmS9cAfrzZ2D9+4i+5By6kAZQjlxUMuUQL3yVGsQOGlSiNnhrU7sZgzeVmoycLjAEIZYmbJ2Pi1XtDCIdeYGVYGeR6A6vpOFBnAFza2gdXsKAGcpP127FIHKoQAwpkEXcYn8jHMd+CwtLpqv+cAWZZ93F3/4HSRlraW7e7v2xiTdFERZBRDfm+GrgZnB28yGqI8I5Ik=",
            "MD5OfBody": "d59f0305126b908a0759f4afba832d3d",
            "Body": "i: 2"
        },
        {
            "MessageId": "c4d91fff-0e08-413d-acc7-8df61503f933",
            "ReceiptHandle": "AQEB+bQCU4JwgUynYjlZTc93xHYYYRFT1DQ1FK3Znp1VJ0UAlqRz/F9dWYe3w/IKcXJSFVINSNlg0uqNjPMZs8CPxW07urHCi1a33REG82pz4OtGgvTmoXCewjByf5QeWM41Dv7uPBIFt3x30yziKle31FaJx0g83KyujzXd4N4BYE1NhOzUOQYU5J5gzq/pcSPq5HPUzUaaCRsP6yeffXh8XH/c/oVm+9QIGcsk2509I3vCRgilngvTx9UVuOumtnaRz/AdOEumsBbmUe2tICPYtPSUBDsSzlgmiOdfGf5EpPg=",
            "MD5OfBody": "b4c9b75a632f11b1b42ea5307e5147c8",
            "Body": "i: 4"
        },
        {
            "MessageId": "76950ae0-4499-467f-90ae-d461b3bbbbba",
            "ReceiptHandle": "AQEBd3wIOr4cJGHvrWc2GtOtXjlnjAAT1+r38lHAc/DY3n4isYF/7oWZVJTA1jIgLmyVHyT5gd2U8ZrTntEnh2rnLpTUcPwafsib6NZsdQQniB+YAn+PfWQv4GoZN+BAxxBt3SbCMgncM3IQJMVQjz5YJI5OKiOmM+8rucNIm2NNxwzz3QJLx0MGvuYp+i2FpubiZLn8jDaC0sjJ0qhQVhBXV6EZ9iAOnc2L1OkZS/+Kt6pV4SPk+nnlscPgeYdAR63vDyJoEpfxT36kUJrT4RIr84Mc97Uq5sW23DGc3P1/t+k=",
            "MD5OfBody": "8a32c5bec4a80d60b5829eec83f2bedc",
            "Body": "i: 6"
        },
        {
            "MessageId": "44df6897-e750-4fe6-993e-fbc81770569c",
            "ReceiptHandle": "AQEB4/aUU/P1wXlgR0S7w7FK/fGT1GkA53qhktiriBb78P7Wwym8NYKxZZta8l7F1f5OD00K3UTbcZueWJSXmJslJZjYwDiEZETQg7Rp3WMlhnCALsPOAVqAkLlUnsOz943XDukRyIHlZ7MHYLTDthKBn4zDYblWqze4W8J6aIH51rmP/z6nzz8ADZVX/OaUO+C6FScQvXwZOl3YAM5rlctLde+qK/JnmZlZ82BOswx4420JwOGgATf16vNbYoZ1i9ArlMBSyuHYjye+qbXCfwPm3qLHipp/KhF5TVWIRrNuxZU=",
            "MD5OfBody": "11f1baece959c0e5461da92542d33111",
            "Body": "i: 8"
        },
        {
            "MessageId": "ef02c1f8-3a01-4c28-a2d5-f3b6ed9374aa",
            "ReceiptHandle": "AQEBLR/bnxyQrYvY6Mpo4MNRQGKf+g25i5caJ1xvYdKx0YbVFC3b3C0XXu/W3RImQJ4fmGIdksZq0LeVENqQZ9mNvUywtygMEmkeuHMEaX0R3/FHrBhWlJBdUQEmejFk8m+BBoxMNz2k3G1E26uN2ddwzXSOJn4Ut5Lr4spFrqq4DFG9dhfA82MKhmrMtT+yISyBDQpoqZlF+/y0LQsW7gdpPtupF6OVMJv/VwZXbPh5BwYGfxqGyK01gJZzEUcuG18gVZr+nE+SkwCP1olAKTwWOkPVwnsqh/F1fgMqYAmLLR4=",
            "MD5OfBody": "aade5c894d4283088c7a1cb3194dbdbb",
            "Body": "i: 10"
        },
        {
            "MessageId": "c91e6ebc-a6af-4c77-b940-5db24fe1f998",
            "ReceiptHandle": "AQEBYNqf2JVIZPZ82BwywsCf5UE8bIsvh1ewXben4pCycXiKQs0IaUkuzwZH/HPPs0nilhp2SW7bYhAZQgtDWLAHDAjFaamxjrxc5U5QG4x7tKmuz16bigR4jEYh9xIgntxfqEhktOUeVuEOVzw8V6YObcX3D7ZmAQnn2nZTOYi6VpQRHvxWTxS52xQkwATlRuRWLYiPf9PSAaQpHzz1PPuwrpnYS1X5lT3V0fZ1tzsN9azIg4mm3kJI7aoh9inSdoomN4UhvFjboB054kO7O+UgY7ncT2ydCrcNXIUVwo91lfM=",
            "MD5OfBody": "342fa06dbd9a92acf61e543c3263597d",
            "Body": "i: 1"
        },
        {
            "MessageId": "ec9f5560-b17a-4069-bdac-97c78caa80e6",
            "ReceiptHandle": "AQEBJIxmMRZEaAQdhUp8lzwbPzlbsCfn1/xQ+A+c8vQdE+A06Q7DwymQGGBYTPDwfa7e/kFe/APpWTKsu5mjxC2xsa9kG/R0lI7fYYvjbTsI3KA5Gky0qCiD/Gu8lS+RpZgf4fYPJ7YuWuscHnBVGR6WtykJ0nVa/oDSGhXM1OVhYzmWRlbrThcgfcFcqmtWkBLEwyVIk6xNz0zWuBn/ceGFnlGaM46zrPdWhhaX8/2LegQtGQHI43mnuRx1Cr83t28VrpoR61LlZLc7/nUHGls5H+oJHRRB2fYKuLHifqthHcY=",
            "MD5OfBody": "0ae31233b58dce299941dfec44f9e974",
            "Body": "i: 3"
        },
        {
            "MessageId": "12209561-5bee-489b-a43d-6ff6158bc61b",
            "ReceiptHandle": "AQEBAvAeeKWTPgsFAiYiNytwK2iuinFErKXYgXJx6o2Ql+9CLgJ/9rrTy7v3fD1X5s7nq9dZSSW3NQgVZRCWHACoa3lon7wUufbeY9aQYot1+ahPh/YkwlAPbkmnYcbOVER/EXPDs+8w7Odp9CQgDd8ntOoZfH5PcfGFm8c4e8dzzyPkxU4WGvwm8dj6k5rtWv1mLzoDY6u1DOJnoADHtMInHppohKMKLjs0tBczyz7V4Mm5rGr5jsvQz0WWTASeSzTwkS7erEvcTAyE9wZ7zuHV+6jSOnqiYi+KZoRto18NLMU=",
            "MD5OfBody": "ed07616b6de421eee6aba693ffc86e8f",
            "Body": "i: 5"
        },
        {
            "MessageId": "a740a242-b4ef-4e18-8545-e9d243ead677",
            "ReceiptHandle": "AQEBs3lj33PvufOvIQatD8d5jp/pcSm1+3rVsZ1ny//i5n9RcpLjr2ukTG1yvEOdrsmpn8ngzpp/fo4x3mEvT1KFnfz2+oUly5Cl5ca1nDcX7NhLMHz4NMIF5G5g9NdIXUb5fBvuLy+LNH1mGe29uDLyG9IwOeKfbtHe7GpvA2z1o97Vywm57A5XMIjIWbbi2U7LlzZOIax8O0zWJ111iM8BY+fKKz3s7BFYCXTxKKjqMoxt2d+d7qauxQHiz1CtEy/yRnfVIbZOCTg/hQhZB9v1G6HRxLOBDZdWmuheyXe15Kg=",
            "MD5OfBody": "87eb3aef4e0dfb7cdfbdf35c3c40a6d9",
            "Body": "i: 7"
        },
        {
            "MessageId": "15a160c6-a51f-4fe4-b267-fb0b09cc7d65",
            "ReceiptHandle": "AQEBj5wKbfhDuw8sba0IiehVIfoQpaZaQSZmWQHEugxNkxW+ulPv24udmvRQDdo5D7VG9a9h8T7QnffRFvuOpbn79oEBB6tkMHqo1RLCGTQA5vXXdJTWAl93wzBOrIn33RiD8RqSp4t+KJNsljSZavT+94HW2YupLnzs+gO9nUdNh2QUhhHCOFKwXAAkjL83TNhUO621+mqrF8+CjL0bNBZrNIIRvqKMaKAr3JBU7I4JkwulFd59BFC9nst4n7iFQ8c0p8o3UGzYnBlYSfkQo/djBKnVWkUmjFtO45j2m/d1kB8=",
            "MD5OfBody": "e4b31e207828c82c6581ccc20daa1c9a",
            "Body": "i: 9"
        }
    ]
}

even グループが先に順番通り返され、その後に odd グループが順番通りに並んでいる。 FIFOキューがメッセージを必ず順番通りに処理することを保証するのは、この メッセージグループID のスコープ内における話。あるグループ内のメッセージは必ず順番通りになる。

が、グループ間の順序は保証されない。 そして取得したいグループをAPIで指定することはできず、どのグループのメッセージが先に返されるかはわからない。SQSはなるべく多くのメッセージが保存されているグループを優先的に処理しようとはするらしい。また、バッチ処理であるグループのメッセージが指定の件数に満たない場合は、別のグループのメッセージで埋めようとする。

つまり、Aグループが4件、Bグループが7件のメッセージをもっているときに、10件取得しようとすると、優先的にBを7件取得し、余っている3件をAグループで埋めようとする。このときAグループから取得される3件は、Aグループ内の順番に従って先頭から3件になる。

このときAグループには1件利用可能なメッセージが残っているが、ここで仮に別のコンシューマからメッセージの取得リクエストを送った場合、メッセージは取得されない。なぜなら、最初にリクエストしたコンシューマの処理が終わっていないからである。ここで取得できてしまうと、Aグループのメッセージが順番通り処理されることを保証できなくなってしまう。

可視性タイムアウトの時間が来たら再び取得できるようになる。

重複排除ID (--message-deduplication-id

順番の保証と並んで、FIFOキューのもう一つの大きな特徴である1回限りの処理について見ていく。

注文に対する課金処理などは、二重に処理されることは許されないので、この特性を生かすことができる。

そのために、メッセージに一意となる重複排除IDを付与する。自前で生成してもいいが、コンテンツベースのポリシー というものを設定すれば、メッセージの内容からSHA-256ハッシュを使って自動で生成してくれる。

CloudFormation で ContentBasedDeduplication を有効にする。

Resources:
  SampleQueue:
    Type: AWS::SQS::Queue
    Properties: 
      QueueName: my-sample-q.fifo
      FifoQueue: true
      ContentBasedDeduplication: true

同じメッセージを5回連続で送る

$ aws sqs send-message $QUEUE_URL --message-body "Hello world" --message-group-id "hoge"
{
    "MD5OfMessageBody": "3e25960a79dbc69b674cd4ec67a72c62",
    "MessageId": "ef262f05-2b1f-4642-830f-2fdb9077b1bf",
    "SequenceNumber": "18852555406053872128"
}
$ aws sqs send-message $QUEUE_URL --message-body "Hello world" --message-group-id "hoge"
{
    "MD5OfMessageBody": "3e25960a79dbc69b674cd4ec67a72c62",
    "MessageId": "ef262f05-2b1f-4642-830f-2fdb9077b1bf",
    "SequenceNumber": "18852555406053872128"
}
$ aws sqs send-message $QUEUE_URL --message-body "Hello world" --message-group-id "hoge"
{
    "MD5OfMessageBody": "3e25960a79dbc69b674cd4ec67a72c62",
    "MessageId": "ef262f05-2b1f-4642-830f-2fdb9077b1bf",
    "SequenceNumber": "18852555406053872128"
}
$ aws sqs send-message $QUEUE_URL --message-body "Hello world" --message-group-id "hoge"
{
    "MD5OfMessageBody": "3e25960a79dbc69b674cd4ec67a72c62",
    "MessageId": "ef262f05-2b1f-4642-830f-2fdb9077b1bf",
    "SequenceNumber": "18852555406053872128"
}
$ aws sqs send-message $QUEUE_URL --message-body "Hello world" --message-group-id "hoge"
{
    "MD5OfMessageBody": "3e25960a79dbc69b674cd4ec67a72c62",
    "MessageId": "ef262f05-2b1f-4642-830f-2fdb9077b1bf",
    "SequenceNumber": "18852555406053872128"
}

全て同じ MessageId であることがわかる。

リクエスト自体は成功するのだが、SQS内部でコンテンツを元に重複を排除している。メッセージは1件しか保存されておらず、5件の受信リクエストを送っても1件しか返ってこない。

FIFOキューではこの重複排除の仕様があるため、コンテンツベースのポリシーを有効にするか、そうでない場合はリクエストに重複排除IDが必須となる。両方指定すると、リクエストで指定したIDがコンテンツベースの自動生成IDを上書きする。

An error occurred (InvalidParameterValue) when calling the SendMessage operation: The queue should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided explicitly

この重複排除の期間はメッセージを送ってから5分間。 それ以降は別のメッセージとして登録されることに注意。

こんな感じです。

最後まで読んだ人は「SQS完全に理解した🧘‍♀️」って言って良いです。