đ° DĂ©butants qui dĂ©couvrent AWS CDK, veuillez consulter mes articles prĂ©cĂ©dents un par un dans cette sĂ©rie.
Si au cas oĂč vous auriez manquĂ© mon article prĂ©cĂ©dent, retrouvez-le avec les liens ci-dessous.
đ Message prĂ©cĂ©dent original sur đ Dev Post
đ RepubliĂ© le post prĂ©cĂ©dent Ă đ dev Ă @aravindvcyber
Dans cet article, refactorisons notre stepfunction prĂ©cĂ©dente qui invoque directement notre fonction lambda en ajoutant une file dâattente similaire qui dĂ©clenche lambda indirectement avec une limite de taille de lot. Cela pourrait nous aider Ă obtenir une meilleure optimisation de la concurrence lambda comme celle de notre article prĂ©cĂ©dent. traitement Ă©volutif pilotĂ© par les Ă©vĂ©nements Ă lâaide dâeventbridge et de sqs
Flux de courant âïž
ProposĂ© đ
Avantages obtenus dans cette approche đŁââïž
Il y a plusieurs avantages associés à cette approche comme suit.
-
Nous pouvons dĂ©tacher lâinvocation directe de notre lambda par Stepfunction dĂ©clenchant ainsi une invocation indirecte/buffĂ©risĂ©e Ă partir dâune file dâattente SQS standard.
-
Avec cette approche, nous pourrons avoir la limite de simultanéité lambda au minimum en regroupant les déclencheurs lambda.
-
De plus, si vous vous souvenez dâune chose, notre lambda est censĂ© effectuer une opĂ©ration dâĂ©criture dynamodb, qui, si elle sâexĂ©cute en haute simultanĂ©itĂ©, nĂ©cessiterait plus dâunitĂ©s dâĂ©criture lors de lâutilisation de notre table dynamodb. Avec cette approche, nous pouvons utiliser maintenir et utiliser des unitĂ©s dâĂ©criture cohĂ©rentes dans notre table.
-
En plus de cela, nous pourrons facilement le mettre Ă lâĂ©chelle dans des environnements plus Ă©levĂ©s.
-
Nous pouvons Ă©galement utiliser les files dâattente de lettres mortes pour le dĂ©bogage et lâinspection.
Affiner la file dâattente utilisĂ©e dans notre construction prĂ©cĂ©dente đ
Modifions notre construction précédente construct/sfn-simple.ts
en ajoutant des dĂ©finitions pour la file dâattente de lettres mortes et la file dâattente de travail comme suit.
File dâattente de lettres mortes đŸ
La file dâattente de lettres mortes doit rĂ©cupĂ©rer les messages ayant Ă©chouĂ©, de sorte que la file dâattente de travail a toujours des enregistrements Ă traiter.
const sfnCommonEventProcessorQueueDLQ: DeadLetterQueue = {
queue: new Queue(this, "sfnCommonEventProcessorQueueDLQ", {
retentionPeriod: Duration.days(14),
removalPolicy: RemovalPolicy.DESTROY,
queueName: "sfnCommonEventProcessorQueueDLQ",
}),
maxReceiveCount: 100,
};
File dâattente standard đ
La file dâattente standard peut Ă©galement ĂȘtre une file dâattente FIFO basĂ©e sur le coĂ»t et les exigences commerciales, comme indiquĂ© ci-dessous.
const sfnCommonEventProcessorQueue = new Queue(
this,
"sfnCommonEventProcessorQueue",
{
retentionPeriod: Duration.days(5),
removalPolicy: RemovalPolicy.DESTROY,
deliveryDelay: Duration.seconds(3),
queueName: "sfnCommonEventProcessorQueue",
visibilityTimeout: Duration.minutes(100),
deadLetterQueue: sfnCommonEventProcessorQueueDLQ,
}
);
Nouveau travail pour envoyer des messages dans notre file dâattente đŻ
Tout comme nous avons dĂ©fini une nouvelle tĂąche dâinvocation lambda, nous pouvons dĂ©finir une nouvelle tĂąche qui poussera un message dans une file dâattente qui sera ensuite interrogĂ©e par notre lambda existant de maniĂšre asynchrone.
Vous pouvez également constater que nous avons entiÚrement réutilisé la charge utile, inputPath, resultPath, resultSelector, etc.
const recordingQueue = new tasks.SqsSendMessage(
this,
"Record using Queue",
{
inputPath: "$",
messageBody: sfnTaskPayload,
queue: sfnCommonEventProcessorQueue,
comment: "Record message into dynamodb using SQS queue buffered lambda",
integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
resultSelector: {
"Payload.$": "$",
"StatusCode.$": "$.statusCode",
},
resultPath: "$.recordResult",
}
);
Ajout dâune source dâĂ©vĂ©nement pour le lambda existant âș
Nous allons maintenant connecter notre nouvelle file dâattente Ă notre lambda existant Ă lâintĂ©rieur de la mĂȘme construction.
Comme vous pouvez le voir, nous avons dĂ©fini une taille de lot de la file dâattente, mĂȘme sâil y a n
nombre de fonctions dâĂ©tat exĂ©cutĂ©es simultanĂ©ment. Vous pouvez Ă©galement utiliser un FIFO
file dâattente si vous constatez que lâordre des messages est trĂšs important lors de lâenregistrement et du traitement.
triggerFunction.addEventSource(
new SqsEventSource(sfnCommonEventProcessorQueue, {
batchSize: 2,
maxBatchingWindow: Duration.seconds(10)
})
);
Ici batchSize
est le nombre maximum dâenregistrements Ă rĂ©cupĂ©rer par invocation et maxBatchingWindow
dĂ©finit le temps dâattente maximal avant de renvoyer les enregistrements Ă lambda pour traitement. Sâil nâest pas dĂ©fini, le lambda revient immĂ©diatement avec autant dâenregistrements disponibles sous le batchSize
.
ModĂšle de rappel đ
Encore une fois, si vous remarquez quelque chose, nous avons implĂ©mentĂ© le modĂšle de rappel, dans lequel nous suspendons lâexĂ©cution de la fonction step aprĂšs avoir envoyĂ© le message Ă SQS et attendons que lambda publie des mises Ă jour sur la machine dâĂ©tat une fois quâelle a effectuĂ© le traitement, entraĂźnant le succĂšs ou lâĂ©chec.
Avec cette approche, nous pourrions comprendre oĂč se dĂ©roule le travail rĂ©el et ne pas continuer Ă interroger les mises Ă jour encore et encore jusquâĂ ce que le rĂ©sultat soit prĂȘt.
illustration du modĂšle de rappel âŠïž
illustration du modĂšle de rappel en cas de rĂ©ussite đ
Modifications de la fonction lambda đŒ
Les modifications actuelles que nous avons apportĂ©es semblent suffisantes Ă premiĂšre vue, mais elles Ă©choueront, car le courant lambda de charge utile obtenu sera diffĂ©rent de lâancienne invocation. En effet, alors que SQS est interrogĂ© par lambda, le message rĂ©el sera Ă lâintĂ©rieur dâun objet qui contient un tableau dâenregistrements dâĂ©vĂ©nements. Et Ă lâintĂ©rieur de lâenregistrement de lâĂ©vĂ©nement, la propriĂ©tĂ© body donnera le message rĂ©el, lorsque nous lâavons transmis comme charge utile dans notre article prĂ©cĂ©dent. Nous devons donc certainement ajuster un peu notre lambda comme suit.
{
"Records": [
{
"messageId": "ff11de7f-795a-4c85-8612-9d376e01df0d",
"receiptHandle": "**********",
"body": "{\"Record\":{\"createdAt\":\"2022-04-18T07:51:06Z\",\"messageId\":\"538a8436-9987-1f65-478d-815c77f00d0f\",\"event\":{\"message\":\"A secret message\"}},\"MyTaskToken\":\"**********\"}",
"attributes": {
"ApproximateReceiveCount": "3",
"SentTimestamp": "1650268267274",
"SenderId": "AROAYLZFFS6HZ3ESZEEBH",
"ApproximateFirstReceiveTimestamp": "1650268270274"
},
"messageAttributes": {},
"md5OfBody": "edddae585d76eb14439f5804dcef24a4",
"eventSource": "aws:sqs",
"eventSourceARN": "**********",
"awsRegion": "ap-south-1"
}
]
}
Réécriture du code lambda đ
Alors, Ă©crivons la fonction lambda de lâenregistreur de messages comme indiquĂ© ci-dessous en prenant note des changements de charge utile ci-dessus.
Pensons que cela nous donne une Ă©tape supplĂ©mentaire, cette Ă©tape nous aide plutĂŽt Ă traiter lâĂ©criture sur lambda dans un lot avec des enregistrements de messages pendant le processus dâinterrogation.
import { PutItemInput } from "aws-sdk/clients/dynamodb";
import { DynamoDB, StepFunctions } from "aws-sdk";
const sfn = new StepFunctions({ apiVersion: "2016-11-23" });
exports.processor = async function (event: any) {
const dynamo = new DynamoDB();
let result: any | undefined = undefined;
await Promise.all(
event.Records.map(async (Record: any) => {
const msg = JSON.parse(Record.body).Record;
const crt_time: number = new Date(msg.createdAt).getTime();
const putData: PutItemInput = {
TableName: process.env.MESSAGES_TABLE_NAME || "",
Item: {
messageId: { S: msg.messageId },
createdAt: { N: `${crt_time}` },
event: { S: JSON.stringify(msg.event) },
},
ReturnConsumedCapacity: "TOTAL",
};
try {
result = await dynamo.putItem(putData).promise();
} catch (err) {
const sendFailure: StepFunctions.SendTaskFailureInput = {
error: JSON.stringify(err),
cause: JSON.stringify({
statusCode: 500,
headers: { "Content-Type": "text/json" },
putStatus: {
messageId: msg.messageId,
ProcessorResult: err,
},
}),
taskToken: JSON.parse(Record.body).MyTaskToken,
};
console.log(sendFailure);
await sfn.sendTaskFailure(sendFailure, function (err: any, data: any) {
if (err) console.log(err, err.stack);
else console.log(data);
});
return sendFailure;
}
const sendSuccess: StepFunctions.SendTaskSuccessInput = {
output: JSON.stringify({
statusCode: 200,
headers: { "Content-Type": "text/json" },
putStatus: {
messageId: msg.messageId,
ProcessorResult: result,
},
}),
taskToken: JSON.parse(Record.body).MyTaskToken,
};
console.log(sendSuccess);
await sfn
.sendTaskSuccess(sendSuccess, function (err: any, data: any) {
if (err) console.log(err, err.stack);
else console.log(data);
})
.promise();
return sendSuccess;
})
);
};
rĂ©sumĂ© des changements lambda đ
exports.processor = async function (event: any) {
const dynamo = new DynamoDB();
let result: any | undefined = undefined;
await Promise.all(
event.Records.map(async (Record: any) => {
const msg = JSON.parse(Record.body).Record;
})
);
};
Changements de TaskToken đ
Outre le taskToken
a Ă©galement Ă©tĂ© modifiĂ©, Ă lâintĂ©rieur du lambda de lâexemple prĂ©cĂ©dent Ă la valeur ci-dessous.
Avant
taskToken: Record.MyTaskToken,
AprĂšs
taskToken: JSON.parse(Record.body).MyTaskToken,
Timeout de la fonction pas Ă pas đŒ
Notez que le dĂ©lai dâattente dĂ©fini pour la fonction pas Ă pas annulera et quittera lâexĂ©cution si elle attend plus de temps que la valeur de dĂ©lai dâattente dĂ©finie pour la fonction pas Ă pas.
b799d9d4-5c32-5f42-89bb-830315b023f7 INFO TaskTimedOut: Task Timed Out: 'Provided task does not exist anymore'
at Request.extractError (/var/runtime/node_modules/aws-sdk/lib/protocol/json.js:52:27)
at Request.callListeners (/var/runtime/node_modules/aws-sdk/lib/sequential_executor.js:106:20)
at Request.emit (/var/runtime/node_modules/aws-sdk/lib/sequential_executor.js:78:10)
at Request.emit (/var/runtime/node_modules/aws-sdk/lib/request.js:686:14)
at Request.transition (/var/runtime/node_modules/aws-sdk/lib/request.js:22:10)
at AcceptorStateMachine.runTo (/var/runtime/node_modules/aws-sdk/lib/state_machine.js:14:12)
at /var/runtime/node_modules/aws-sdk/lib/state_machine.js:26:10
at Request.<anonymous> (/var/runtime/node_modules/aws-sdk/lib/request.js:38:9)
at Request.<anonymous> (/var/runtime/node_modules/aws-sdk/lib/request.js:688:12)
at Request.callListeners (/var/runtime/node_modules/aws-sdk/lib/sequential_executor.js:116:18) {
code: 'TaskTimedOut',
DĂ©ploiement de nos changements đȘ
Pendant que vous dĂ©ployez vos modifications, vous pouvez ĂȘtre invitĂ© Ă accepter la modification IAM pour accorder les privilĂšges nĂ©cessaires Ă sfn pour envoyer des messages Ă SQS, similaire Ă ce que nous avons explicitement accordĂ© Ă lambda dans notre dernier article.
Exemple de journal dâexĂ©cution đ°
Ainsi, nous avons atteint lâobjectif de cet article de faire en sorte que la fonction step permette de placer les messages sous forme dâenregistrements dans une file dâattente, qui est ensuite interrogĂ©e par le lambda et traitĂ©e ultĂ©rieurement pour renvoyer lâĂ©tat Ă la machine dâĂ©tat. Cette architecture est donc beaucoup plus Ă©volutive que la prĂ©cĂ©dente avec lâinvocation directe de la fonction lambda.
Nous ajouterons plus de connexions à notre pile et la rendrons plus utilisable dans les prochains articles en créant de nouvelles constructions, alors pensez à suivre et à vous abonner à ma newsletter.
â Nous avons notre prochain article en serverless, consultez
đ Merci pour votre soutien ! đ
Ce serait formidable si vous aimez â Achetez-moi un cafĂ©, pour aider Ă stimuler mes efforts.
đ Message original sur đ Dev Post
đ RepubliĂ© Ă đ dev Ă @aravindvcyber
đŹ AWS CDK 101 â đŹ Ajout dâune file dâattente pour tamponner notre stepfunction appelant directement lambda#manuscrit #sans serveur #messagequeue #awscdk #aws https://t.co/Fm785cuPf8
â Aravind V (@Aravind_V7) 19 avril 2022