Solution
Combinez plusieurs requêtes en un seul lot de requêtes. Le lot de la demande sera envoyé au nœud de cluster pour traitement. chaque demande étant traitée exactement de la même manière qu’une demande individuelle. Il répondra ensuite avec le lot de réponses.
Prenons l’exemple d’un magasin de valeurs-clés distribué, où le client envoie des demandes pour stocker plusieurs valeurs-clés sur le serveur. Lorsque le client reçoit un appel pour envoyer la requête, il ne l’envoie pas immédiatement sur le réseau ; au lieu de cela, il conserve une file d’attente de demandes à envoyer.
Client de classe…
LinkedBlockingQueue<RequestEntry> requests = new LinkedBlockingQueue<>(); public CompletableFuture send(SetValueRequest setValueRequest) { int requestId = enqueueRequest(setValueRequest); CompletableFuture responseFuture = trackPendingRequest(requestId); return responseFuture; } private int enqueueRequest(SetValueRequest setValueRequest) { int requestId = nextRequestId(); byte[] requestBytes = serialize(setValueRequest, requestId); requests.add(new RequestEntry(requestBytes, clock.nanoTime())); return requestId; } private int nextRequestId() { return requestNumber++; }
L’heure à laquelle la requête est demandée est suivie ; ceci est ensuite utilisé pour décider si la demande peut être envoyée dans le cadre du lot.
classe RequestEntry…
class RequestEntry { byte[] serializedRequest; long createdTime; public RequestEntry(byte[] serializedRequest, long createdTime) { this.serializedRequest = serializedRequest; this.createdTime = createdTime; }
Il suit ensuite les demandes en attente à traiter lorsqu’une réponse est reçue. Chaque demande se verra attribuer un numéro de demande unique qui peut être utilisé pour mapper la réponse et compléter les demandes.
Client de classe…
Map<Integer, CompletableFuture> pendingRequests = new ConcurrentHashMap<>(); private CompletableFuture trackPendingRequest(Integer correlationId) { CompletableFuture responseFuture = new CompletableFuture(); pendingRequests.put(correlationId, responseFuture); return responseFuture; }
Le client démarre une tâche distincte qui suit en permanence les demandes mises en file d’attente.
Client de classe…
public Client(Config config, InetAddressAndPort serverAddress, SystemClock clock) { this.clock = clock; this.sender = new Sender(config, serverAddress, clock); this.sender.start(); }
classe Expéditeur…
@Override public void run() { while (isRunning) { boolean maxWaitTimeElapsed = requestsWaitedFor(config.getMaxBatchWaitTime()); boolean maxBatchSizeReached = maxBatchSizeReached(requests); if (maxWaitTimeElapsed || maxBatchSizeReached) { RequestBatch batch = createBatch(requests); try { BatchResponse batchResponse = sendBatchRequest(batch, address); handleResponse(batchResponse); } catch (IOException e) { batch.getPackedRequests().stream().forEach(r -> { pendingRequests.get(r.getCorrelationId()).completeExceptionally(e); }); } } } } private RequestBatch createBatch(LinkedBlockingQueue<RequestEntry> requests) { RequestBatch batch = new RequestBatch(MAX_BATCH_SIZE_BYTES); RequestEntry entry = requests.peek(); while (entry != null && batch.hasSpaceFor(entry.getRequest())) { batch.add(entry.getRequest()); requests.remove(entry); entry = requests.peek(); } return batch; }
classe RequestBatch…
public boolean hasSpaceFor(byte[] requestBytes) { return batchSize() + requestBytes.length <= maxSize; } private int batchSize() { return requests.stream().map(r->r.length).reduce(0, Integer::sum); }
Deux vérifications sont généralement effectuées.
- Si suffisamment de requêtes se sont accumulées pour remplir le lot à la taille maximale configurée.
classe Expéditeur…
private boolean maxBatchSizeReached(Queue<RequestEntry> requests) { return accumulatedRequestSize(requests) > MAX_BATCH_SIZE_BYTES; } private int accumulatedRequestSize(Queue<RequestEntry> requests) { return requests.stream().map(re -> re.size()).reduce((r1, r2) -> r1 + r2).orElse(0); }
Comme nous ne pouvons pas attendre indéfiniment que le lot soit rempli, nous pouvons configurer un petit temps d’attente. La tâche émettrice attend puis vérifie si la demande a été ajoutée avant le temps d’attente maximum.
classe Expéditeur…
private boolean requestsWaitedFor(long batchingWindowInMs) { RequestEntry oldestPendingRequest = requests.peek(); if (oldestPendingRequest == null) { return false; } long oldestEntryWaitTime = clock.nanoTime() - oldestPendingRequest.createdTime; return oldestEntryWaitTime > batchingWindowInMs; }
Une fois que l’une de ces conditions a été remplie, la demande par lots peut alors être envoyée au serveur. Le serveur décompresse la requête par lots et traite chacune des requêtes individuelles.
serveur de classe…
private void handleBatchRequest(RequestOrResponse batchRequest, ClientConnection clientConnection) { RequestBatch batch = JsonSerDes.deserialize(batchRequest.getMessageBodyJson(), RequestBatch.class); List<RequestOrResponse> requests = batch.getPackedRequests(); List<RequestOrResponse> responses = new ArrayList<>(); for (RequestOrResponse request : requests) { RequestOrResponse response = handleSetValueRequest(request); responses.add(response); } sendResponse(batchRequest, clientConnection, new BatchResponse(responses)); } private RequestOrResponse handleSetValueRequest(RequestOrResponse request) { SetValueRequest setValueRequest = JsonSerDes.deserialize(request.getMessageBodyJson(), SetValueRequest.class); kv.put(setValueRequest.getKey(), setValueRequest.getValue()); RequestOrResponse response = new RequestOrResponse(RequestId.SetValueResponse.getId(), "Success".getBytes(), request.getCorrelationId()); return response; }
Le client reçoit la réponse par lots et termine toutes les demandes en attente.
classe Expéditeur…
private void handleResponse(BatchResponse batchResponse) { List<RequestOrResponse> responseList = batchResponse.getResponseList(); logger.debug("Completing requests from " + responseList.get(0).getCorrelationId() + " to " + responseList.get(responseList.size() - 1).getCorrelationId()); responseList.stream().forEach(r -> { CompletableFuture completableFuture = pendingRequests.remove(r.getCorrelationId()); if (completableFuture != null) { completableFuture.complete(r); } else { logger.error("no pending request for " + r.getCorrelationId()); } }); }
Considérations techniques
La taille du lot doit être choisie en fonction de la taille des messages individuels et de la bande passante réseau disponible, ainsi que des améliorations de latence et de débit observées en fonction de la charge réelle. Ceux-ci sont configurés avec des valeurs par défaut raisonnables en supposant des tailles de message plus petites et la taille de lot optimale pour le traitement côté serveur. Par exemple, Kafka a une taille de lot par défaut de 16 Ko. Il a également un paramètre de configuration appelé “linger.ms” avec la valeur par défaut de 0. Cependant, si la taille des messages est plus grande, une taille de lot plus élevée peut mieux fonctionner.
Avoir une taille de lot trop importante n’offrira probablement que des rendements décroissants. Par exemple, avoir une taille de lot en Mo peut ajouter des frais généraux supplémentaires en termes de traitement. C’est pourquoi le paramètre de taille de lot est généralement réglé en fonction des observations faites lors des tests de performances.
Un lot de demandes est généralement utilisé avec Request Pipeline pour améliorer le débit et la latence globaux.
Lorsque la stratégie d’interruption de nouvelle tentative est utilisée pour envoyer des requêtes aux nœuds de cluster, la totalité de la requête par lots sera réessayée. Le nœud de cluster a peut-être déjà traité une partie du lot ; afin de vous assurer que la nouvelle tentative fonctionne sans aucun problème, vous devez implémenter Idempotent Receiver.