Monitoring Serwerów - Forum o monitoringu infrastruktury IT
rozmiar dokumentów w Kafka
#1
Sad 
Hej, 
nie jestem pewny czy to dobry dział, czy jednak powinno to wylądować w logstashu, ale trudno, mam następujący problem:

Kafka nie przyjmuje dokumentów ponieważ twierdzi, że dokumenty są za duże.

Zwiększanie limitów nic nie daje, bo dotarłem do poziomu 10MB i dalej niektórych zdarzeń logstash nie jest w stanie wysłać do kafki.

Po czasie skutkuje to zapełnieniem kolejki w logstashu, co w konsekwencji prowadzi do zawieszenia całego pipeline...


Logi z logstasha:

Code:
[2020-09-03T00:53:38,603][WARN ][logstash.outputs.kafka   ] KafkaProducer.send() failed: org.apache.kafka.common.er
rors.RecordTooLargeException: The message is 1223210 bytes when serialized which is larger than the maximum request
size you have configured with the max.request.size configuration. {:exception=>java.util.concurrent.ExecutionExcep
tion: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1223210 bytes when serialized which is
larger than the maximum request size you have configured with the max.request.size configuration.}
[2020-09-03T00:53:38,644][INFO ][logstash.outputs.kafka   ] Sending batch to Kafka failed. Will retry after a delay
. {:batch_size=>1, :failures=>1, :sleep=>0.1}
[2020-09-03T00:53:38,769][WARN ][logstash.outputs.kafka   ] KafkaProducer.send() failed: org.apache.kafka.common.er
rors.RecordTooLargeException: The message is 1223210 bytes when serialized which is larger than the maximum request
size you have configured with the max.request.size configuration. {:exception=>java.util.concurrent.ExecutionExcep
tion: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1223210 bytes when serialized which is
larger than the maximum request size you have configured with the max.request.size configuration.}
[2020-09-03T00:53:38,770][INFO ][logstash.outputs.kafka   ] Sending batch to Kafka failed. Will retry after a delay
. {:batch_size=>1, :failures=>1, :sleep=>0.1}
[2020-09-03T00:53:38,878][INFO ][logstash.outputs.kafka   ] Exhausted user-configured retry count when sending to K
afka. Dropping these events. {:max_retries=>1, :drop_count=>1}
[2020-09-03T02:15:12,763][WARN ][logstash.outputs.kafka   ] KafkaProducer.send() failed: org.apache.kafka.common.er
rors.RecordTooLargeException: The message is 1216262 bytes when serialized which is larger than the maximum request
size you have configured with the max.request.size configuration. {:exception=>java.util.concurrent.ExecutionExcep
tion: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1216262 bytes when serialized which is
larger than the maximum request size you have configured with the max.request.size configuration.}
[2020-09-03T02:15:12,764][INFO ][logstash.outputs.kafka   ] Sending batch to Kafka failed. Will retry after a delay
. {:batch_size=>1, :failures=>1, :sleep=>0.1}
[2020-09-03T02:15:12,871][WARN ][logstash.outputs.kafka   ] KafkaProducer.send() failed: org.apache.kafka.common.er
rors.RecordTooLargeException: The message is 1216262 bytes when serialized which is larger than the maximum request
size you have configured with the max.request.size configuration. {:exception=>java.util.concurrent.ExecutionExcep
tion: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1216262 bytes when serialized which is
larger than the maximum request size you have configured with the max.request.size configuration.}
[2020-09-03T02:15:12,871][INFO ][logstash.outputs.kafka   ] Sending batch to Kafka failed. Will retry after a delay
. {:batch_size=>1, :failures=>1, :sleep=>0.1}


Jak najlepiej rozwiązać powyższy problem?
Reply
#2
W zależności od potrzeb proponuje 3 rozwiązania które powinny się sprawdzić:

1. Tagowanie dokumentów posiadających więcej niż 10000 znaków. w polu message. Taki dokument można w sekcji output skierować np pliku - output file{}, a następnie podejrzeć i odpowiednio sparsować tak żeby pole message było już pocięte na odpowiednie pola. W tym przypadku duże dokumenty będą pomijały output kafki i pipeline logstasha się nie zapełni.



Code:
filter {
        ruby {
                code => "
                        if event.get('message').length > 10000
                        event.tag('TLTR')
                        end
                "
        }
}


2. Truncate + Tagowanie. Dokument zostanie obcięty po określonej liczbie bajtów oraz otagowany tak żeby było wiadomo który message jest ucięty.
W tym przypadku duże dokumenty będą ucinane i zostaną poprawnie odebrane po stronie kafki,a pipeline logstasha się nie zapełni.


Code:
filter {
        truncate {
                fields => ["message"]
                length_bytes => 49999999
                add_tag => "TLTR"
        }
}


3. Rozwiązanie przydatne w przypadku w którym wiemy, że "duże dokumenty" zawierają nieistotne informacje i możemy pozwolić sobie na utratę.
W tym wypadku dokument który zostanie odbity od kafki, wróci do kolejki tylko na 1 próbę, a następnie zostanie porzucony, nie zapychając tym samym pipeline logstasha.


Code:
W sekcji output należy dodać parametr:

retries=> 1
Reply


Forum Jump:

User Panel Messages