rozmiar dokumentów w Kafka - joachim_szlosarek - 09-03-2020
Hej,
nie jestem pewny czy to dobry dział, czy jednak powinno to wylądować w logstashu, ale trudno, mam następujący problem:
Kafka nie przyjmuje dokumentów ponieważ twierdzi, że dokumenty są za duże.
Zwiększanie limitów nic nie daje, bo dotarłem do poziomu 10MB i dalej niektórych zdarzeń logstash nie jest w stanie wysłać do kafki.
Po czasie skutkuje to zapełnieniem kolejki w logstashu, co w konsekwencji prowadzi do zawieszenia całego pipeline...
Logi z logstasha:
Code: [2020-09-03T00:53:38,603][WARN ][logstash.outputs.kafka ] KafkaProducer.send() failed: org.apache.kafka.common.er
rors.RecordTooLargeException: The message is 1223210 bytes when serialized which is larger than the maximum request
size you have configured with the max.request.size configuration. {:exception=>java.util.concurrent.ExecutionExcep
tion: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1223210 bytes when serialized which is
larger than the maximum request size you have configured with the max.request.size configuration.}
[2020-09-03T00:53:38,644][INFO ][logstash.outputs.kafka ] Sending batch to Kafka failed. Will retry after a delay
. {:batch_size=>1, :failures=>1, :sleep=>0.1}
[2020-09-03T00:53:38,769][WARN ][logstash.outputs.kafka ] KafkaProducer.send() failed: org.apache.kafka.common.er
rors.RecordTooLargeException: The message is 1223210 bytes when serialized which is larger than the maximum request
size you have configured with the max.request.size configuration. {:exception=>java.util.concurrent.ExecutionExcep
tion: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1223210 bytes when serialized which is
larger than the maximum request size you have configured with the max.request.size configuration.}
[2020-09-03T00:53:38,770][INFO ][logstash.outputs.kafka ] Sending batch to Kafka failed. Will retry after a delay
. {:batch_size=>1, :failures=>1, :sleep=>0.1}
[2020-09-03T00:53:38,878][INFO ][logstash.outputs.kafka ] Exhausted user-configured retry count when sending to K
afka. Dropping these events. {:max_retries=>1, :drop_count=>1}
[2020-09-03T02:15:12,763][WARN ][logstash.outputs.kafka ] KafkaProducer.send() failed: org.apache.kafka.common.er
rors.RecordTooLargeException: The message is 1216262 bytes when serialized which is larger than the maximum request
size you have configured with the max.request.size configuration. {:exception=>java.util.concurrent.ExecutionExcep
tion: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1216262 bytes when serialized which is
larger than the maximum request size you have configured with the max.request.size configuration.}
[2020-09-03T02:15:12,764][INFO ][logstash.outputs.kafka ] Sending batch to Kafka failed. Will retry after a delay
. {:batch_size=>1, :failures=>1, :sleep=>0.1}
[2020-09-03T02:15:12,871][WARN ][logstash.outputs.kafka ] KafkaProducer.send() failed: org.apache.kafka.common.er
rors.RecordTooLargeException: The message is 1216262 bytes when serialized which is larger than the maximum request
size you have configured with the max.request.size configuration. {:exception=>java.util.concurrent.ExecutionExcep
tion: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1216262 bytes when serialized which is
larger than the maximum request size you have configured with the max.request.size configuration.}
[2020-09-03T02:15:12,871][INFO ][logstash.outputs.kafka ] Sending batch to Kafka failed. Will retry after a delay
. {:batch_size=>1, :failures=>1, :sleep=>0.1}
Jak najlepiej rozwiązać powyższy problem?
RE: rozmiar dokumentów w Kafka - SzymonK - 09-04-2020
W zależności od potrzeb proponuje 3 rozwiązania które powinny się sprawdzić:
1. Tagowanie dokumentów posiadających więcej niż 10000 znaków. w polu message. Taki dokument można w sekcji output skierować np pliku - output file{}, a następnie podejrzeć i odpowiednio sparsować tak żeby pole message było już pocięte na odpowiednie pola. W tym przypadku duże dokumenty będą pomijały output kafki i pipeline logstasha się nie zapełni.
Code: filter {
ruby {
code => "
if event.get('message').length > 10000
event.tag('TLTR')
end
"
}
}
2. Truncate + Tagowanie. Dokument zostanie obcięty po określonej liczbie bajtów oraz otagowany tak żeby było wiadomo który message jest ucięty.
W tym przypadku duże dokumenty będą ucinane i zostaną poprawnie odebrane po stronie kafki,a pipeline logstasha się nie zapełni.
Code: filter {
truncate {
fields => ["message"]
length_bytes => 49999999
add_tag => "TLTR"
}
}
3. Rozwiązanie przydatne w przypadku w którym wiemy, że "duże dokumenty" zawierają nieistotne informacje i możemy pozwolić sobie na utratę.
W tym wypadku dokument który zostanie odbity od kafki, wróci do kolejki tylko na 1 próbę, a następnie zostanie porzucony, nie zapychając tym samym pipeline logstasha.
Code: W sekcji output należy dodać parametr:
retries=> 1
|