X

AWS CDK 101 – Ajout d’une file d’attente pour mettre en mĂ©moire tampon notre stepfunction invoquer lambda


🔰 DĂ©butants qui dĂ©couvrent AWS CDK, veuillez consulter mes articles prĂ©cĂ©dents un par un dans cette sĂ©rie.

Si au cas oĂč vous auriez manquĂ© mon article prĂ©cĂ©dent, retrouvez-le avec les liens ci-dessous.

🔁 Message prĂ©cĂ©dent original sur 🔗 Dev Post

🔁 RepubliĂ© le post prĂ©cĂ©dent Ă  🔗 dev Ă  @aravindvcyber

Dans cet article, refactorisons notre stepfunction prĂ©cĂ©dente qui invoque directement notre fonction lambda en ajoutant une file d’attente similaire qui dĂ©clenche lambda indirectement avec une limite de taille de lot. Cela pourrait nous aider Ă  obtenir une meilleure optimisation de la concurrence lambda comme celle de notre article prĂ©cĂ©dent. traitement Ă©volutif pilotĂ© par les Ă©vĂ©nements Ă  l’aide d’eventbridge et de sqs

Flux de courant ☘

ProposĂ© 🍀

Avantages obtenus dans cette approche đŸšŁâ€â™€ïž

Il y a plusieurs avantages associés à cette approche comme suit.

  • Nous pouvons dĂ©tacher l’invocation directe de notre lambda par Stepfunction dĂ©clenchant ainsi une invocation indirecte/buffĂ©risĂ©e Ă  partir d’une file d’attente SQS standard.

  • Avec cette approche, nous pourrons avoir la limite de simultanĂ©itĂ© lambda au minimum en regroupant les dĂ©clencheurs lambda.

  • De plus, si vous vous souvenez d’une chose, notre lambda est censĂ© effectuer une opĂ©ration d’écriture dynamodb, qui, si elle s’exĂ©cute en haute simultanĂ©itĂ©, nĂ©cessiterait plus d’unitĂ©s d’écriture lors de l’utilisation de notre table dynamodb. Avec cette approche, nous pouvons utiliser maintenir et utiliser des unitĂ©s d’écriture cohĂ©rentes dans notre table.

  • En plus de cela, nous pourrons facilement le mettre Ă  l’échelle dans des environnements plus Ă©levĂ©s.

  • Nous pouvons Ă©galement utiliser les files d’attente de lettres mortes pour le dĂ©bogage et l’inspection.

Affiner la file d’attente utilisĂ©e dans notre construction prĂ©cĂ©dente 🔖

Modifions notre construction prĂ©cĂ©dente construct/sfn-simple.ts en ajoutant des dĂ©finitions pour la file d’attente de lettres mortes et la file d’attente de travail comme suit.

File d’attente de lettres mortes đŸŽŸ

La file d’attente de lettres mortes doit rĂ©cupĂ©rer les messages ayant Ă©chouĂ©, de sorte que la file d’attente de travail a toujours des enregistrements Ă  traiter.

 const sfnCommonEventProcessorQueueDLQ: DeadLetterQueue = {
      queue: new Queue(this, "sfnCommonEventProcessorQueueDLQ", {
        retentionPeriod: Duration.days(14),
        removalPolicy: RemovalPolicy.DESTROY,
        queueName: "sfnCommonEventProcessorQueueDLQ",
      }),
      maxReceiveCount: 100,
};

File d’attente standard 🏊

La file d’attente standard peut Ă©galement ĂȘtre une file d’attente FIFO basĂ©e sur le coĂ»t et les exigences commerciales, comme indiquĂ© ci-dessous.


const sfnCommonEventProcessorQueue = new Queue(
      this,
      "sfnCommonEventProcessorQueue",
      {
        retentionPeriod: Duration.days(5),
        removalPolicy: RemovalPolicy.DESTROY,
        deliveryDelay: Duration.seconds(3),
        queueName: "sfnCommonEventProcessorQueue",
        visibilityTimeout: Duration.minutes(100),
        deadLetterQueue: sfnCommonEventProcessorQueueDLQ,
      }
);

Nouveau travail pour envoyer des messages dans notre file d’attente 🎯

Tout comme nous avons dĂ©fini une nouvelle tĂąche d’invocation lambda, nous pouvons dĂ©finir une nouvelle tĂąche qui poussera un message dans une file d’attente qui sera ensuite interrogĂ©e par notre lambda existant de maniĂšre asynchrone.

Vous pouvez également constater que nous avons entiÚrement réutilisé la charge utile, inputPath, resultPath, resultSelector, etc.

const recordingQueue = new tasks.SqsSendMessage(
      this,
      "Record using Queue",
      {
        inputPath: "$",
        messageBody: sfnTaskPayload,
        queue: sfnCommonEventProcessorQueue,
        comment: "Record message into dynamodb using SQS queue buffered lambda",
        integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
        resultSelector: {
          "Payload.$": "$",
          "StatusCode.$": "$.statusCode",
        },
        resultPath: "$.recordResult",
      }
);

Ajout d’une source d’évĂ©nement pour le lambda existant â›ș

Nous allons maintenant connecter notre nouvelle file d’attente Ă  notre lambda existant Ă  l’intĂ©rieur de la mĂȘme construction.

Comme vous pouvez le voir, nous avons dĂ©fini une taille de lot de la file d’attente, mĂȘme s’il y a n nombre de fonctions d’état exĂ©cutĂ©es simultanĂ©ment. Vous pouvez Ă©galement utiliser un FIFO file d’attente si vous constatez que l’ordre des messages est trĂšs important lors de l’enregistrement et du traitement.

triggerFunction.addEventSource(
      new SqsEventSource(sfnCommonEventProcessorQueue, {
        batchSize: 2,
        maxBatchingWindow: Duration.seconds(10)
      })
    );

Ici batchSize est le nombre maximum d’enregistrements Ă  rĂ©cupĂ©rer par invocation et maxBatchingWindow dĂ©finit le temps d’attente maximal avant de renvoyer les enregistrements Ă  lambda pour traitement. S’il n’est pas dĂ©fini, le lambda revient immĂ©diatement avec autant d’enregistrements disponibles sous le batchSize.

Modùle de rappel 👓

Encore une fois, si vous remarquez quelque chose, nous avons implĂ©mentĂ© le modĂšle de rappel, dans lequel nous suspendons l’exĂ©cution de la fonction step aprĂšs avoir envoyĂ© le message Ă  SQS et attendons que lambda publie des mises Ă  jour sur la machine d’état une fois qu’elle a effectuĂ© le traitement, entraĂźnant le succĂšs ou l’échec.

Avec cette approche, nous pourrions comprendre oĂč se dĂ©roule le travail rĂ©el et ne pas continuer Ă  interroger les mises Ă  jour encore et encore jusqu’à ce que le rĂ©sultat soit prĂȘt.

illustration du modĂšle de rappel ♊

illustration du modĂšle de rappel en cas de rĂ©ussite 💎

Modifications de la fonction lambda đŸŽŒ

Les modifications actuelles que nous avons apportĂ©es semblent suffisantes Ă  premiĂšre vue, mais elles Ă©choueront, car le courant lambda de charge utile obtenu sera diffĂ©rent de l’ancienne invocation. En effet, alors que SQS est interrogĂ© par lambda, le message rĂ©el sera Ă  l’intĂ©rieur d’un objet qui contient un tableau d’enregistrements d’évĂ©nements. Et Ă  l’intĂ©rieur de l’enregistrement de l’évĂ©nement, la propriĂ©tĂ© body donnera le message rĂ©el, lorsque nous l’avons transmis comme charge utile dans notre article prĂ©cĂ©dent. Nous devons donc certainement ajuster un peu notre lambda comme suit.

{
    "Records": [
        {
            "messageId": "ff11de7f-795a-4c85-8612-9d376e01df0d",
            "receiptHandle": "**********",
            "body": "{\"Record\":{\"createdAt\":\"2022-04-18T07:51:06Z\",\"messageId\":\"538a8436-9987-1f65-478d-815c77f00d0f\",\"event\":{\"message\":\"A secret message\"}},\"MyTaskToken\":\"**********\"}",
            "attributes": {
                "ApproximateReceiveCount": "3",
                "SentTimestamp": "1650268267274",
                "SenderId": "AROAYLZFFS6HZ3ESZEEBH",
                "ApproximateFirstReceiveTimestamp": "1650268270274"
            },
            "messageAttributes": {},
            "md5OfBody": "edddae585d76eb14439f5804dcef24a4",
            "eventSource": "aws:sqs",
            "eventSourceARN": "**********",
            "awsRegion": "ap-south-1"
        }
    ]
}

Réécriture du code lambda 📝

Alors, Ă©crivons la fonction lambda de l’enregistreur de messages comme indiquĂ© ci-dessous en prenant note des changements de charge utile ci-dessus.

Pensons que cela nous donne une Ă©tape supplĂ©mentaire, cette Ă©tape nous aide plutĂŽt Ă  traiter l’écriture sur lambda dans un lot avec des enregistrements de messages pendant le processus d’interrogation.

import { PutItemInput } from "aws-sdk/clients/dynamodb";
import { DynamoDB, StepFunctions } from "aws-sdk";
const sfn = new StepFunctions({ apiVersion: "2016-11-23" });
exports.processor = async function (event: any) {
  const dynamo = new DynamoDB();
  let result: any | undefined = undefined;
  await Promise.all(
    event.Records.map(async (Record: any) => {

      const msg = JSON.parse(Record.body).Record;
      const crt_time: number = new Date(msg.createdAt).getTime();
      const putData: PutItemInput = {
        TableName: process.env.MESSAGES_TABLE_NAME || "",
        Item: {
          messageId: { S: msg.messageId },
          createdAt: { N: `${crt_time}` },
          event: { S: JSON.stringify(msg.event) },
        },
        ReturnConsumedCapacity: "TOTAL",
      }; 
      try {
        result = await dynamo.putItem(putData).promise();
      } catch (err) {
        const sendFailure: StepFunctions.SendTaskFailureInput = {
          error: JSON.stringify(err),
          cause: JSON.stringify({
            statusCode: 500,
            headers: { "Content-Type": "text/json" },
            putStatus: {
              messageId: msg.messageId,
              ProcessorResult: err,
            },
          }),
          taskToken: JSON.parse(Record.body).MyTaskToken,
        };
        console.log(sendFailure);
        await sfn.sendTaskFailure(sendFailure, function (err: any, data: any) {
          if (err) console.log(err, err.stack); 
          else console.log(data); 
        });
        return sendFailure;
      }
      const sendSuccess: StepFunctions.SendTaskSuccessInput = {
        output: JSON.stringify({
          statusCode: 200,
          headers: { "Content-Type": "text/json" },
          putStatus: {
            messageId: msg.messageId,
            ProcessorResult: result,
          },
        }),
        taskToken: JSON.parse(Record.body).MyTaskToken,
      };

      console.log(sendSuccess);

      await sfn
        .sendTaskSuccess(sendSuccess, function (err: any, data: any) {
          if (err) console.log(err, err.stack); 
          else console.log(data); 
        })
        .promise();

      return sendSuccess;
    })
  );
};

rĂ©sumĂ© des changements lambda 🍋

exports.processor = async function (event: any) {
  const dynamo = new DynamoDB();
  let result: any | undefined = undefined;
  await Promise.all(
    event.Records.map(async (Record: any) => {  
      const msg = JSON.parse(Record.body).Record;
      
    })
  );
};

Changements de TaskToken 🏆

Outre le taskToken a Ă©galement Ă©tĂ© modifiĂ©, Ă  l’intĂ©rieur du lambda de l’exemple prĂ©cĂ©dent Ă  la valeur ci-dessous.

Avant

taskToken: Record.MyTaskToken,

AprĂšs

taskToken: JSON.parse(Record.body).MyTaskToken,

Timeout de la fonction pas Ă  pas đŸŒŒ

Notez que le dĂ©lai d’attente dĂ©fini pour la fonction pas Ă  pas annulera et quittera l’exĂ©cution si elle attend plus de temps que la valeur de dĂ©lai d’attente dĂ©finie pour la fonction pas Ă  pas.

b799d9d4-5c32-5f42-89bb-830315b023f7    INFO    TaskTimedOut: Task Timed Out: 'Provided task does not exist anymore'
    at Request.extractError (/var/runtime/node_modules/aws-sdk/lib/protocol/json.js:52:27)
    at Request.callListeners (/var/runtime/node_modules/aws-sdk/lib/sequential_executor.js:106:20)
    at Request.emit (/var/runtime/node_modules/aws-sdk/lib/sequential_executor.js:78:10)
    at Request.emit (/var/runtime/node_modules/aws-sdk/lib/request.js:686:14)
    at Request.transition (/var/runtime/node_modules/aws-sdk/lib/request.js:22:10)
    at AcceptorStateMachine.runTo (/var/runtime/node_modules/aws-sdk/lib/state_machine.js:14:12)
    at /var/runtime/node_modules/aws-sdk/lib/state_machine.js:26:10
    at Request.<anonymous> (/var/runtime/node_modules/aws-sdk/lib/request.js:38:9)
    at Request.<anonymous> (/var/runtime/node_modules/aws-sdk/lib/request.js:688:12)
    at Request.callListeners (/var/runtime/node_modules/aws-sdk/lib/sequential_executor.js:116:18) {
  code: 'TaskTimedOut',

DĂ©ploiement de nos changements đŸȘ

Pendant que vous dĂ©ployez vos modifications, vous pouvez ĂȘtre invitĂ© Ă  accepter la modification IAM pour accorder les privilĂšges nĂ©cessaires Ă  sfn pour envoyer des messages Ă  SQS, similaire Ă  ce que nous avons explicitement accordĂ© Ă  lambda dans notre dernier article.

Exemple de journal d’exĂ©cution 🌰

Ainsi, nous avons atteint l’objectif de cet article de faire en sorte que la fonction step permette de placer les messages sous forme d’enregistrements dans une file d’attente, qui est ensuite interrogĂ©e par le lambda et traitĂ©e ultĂ©rieurement pour renvoyer l’état Ă  la machine d’état. Cette architecture est donc beaucoup plus Ă©volutive que la prĂ©cĂ©dente avec l’invocation directe de la fonction lambda.

Nous ajouterons plus de connexions à notre pile et la rendrons plus utilisable dans les prochains articles en créant de nouvelles constructions, alors pensez à suivre et à vous abonner à ma newsletter.

⏭ Nous avons notre prochain article en serverless, consultez

🎉 Merci pour votre soutien ! 🙏

Ce serait formidable si vous aimez ☕ Achetez-moi un cafĂ©, pour aider Ă  stimuler mes efforts.

🔁 Message original sur 🔗 Dev Post

🔁 RepubliĂ© Ă  🔗 dev Ă  @aravindvcyber

🍬 AWS CDK 101 – 🔬 Ajout d’une file d’attente pour tamponner notre stepfunction appelant directement lambda#manuscrit #sans serveur #messagequeue #awscdk #aws https://t.co/Fm785cuPf8

— Aravind V (@Aravind_V7) 19 avril 2022