Конвейер обработки
Конвейер обработки — это последовательность процессоров, которые применяются к документам по мере их добавления в индекс. Каждый процессор в конвейере выполняет определенную задачу, например фильтрацию, преобразование или добавление данных.
Процессор — это настраиваемые задачи, которые выполняются в последовательном порядке, в котором они указаны в теле запроса. Этот порядок важен, так как каждый процессор зависит от результатов предыдущего процессора. Измененные документы появляются в индексе после применения процессоров.
Конвейеры Подсистемы работают внутри кластера, в то время как Data Prepper – это внешний компонент, который работает в кластере.
Конвейеры Подсистемы выполняют действия с индексами и предпочтительны для случаев использования, связанных с предварительной обработкой простых наборов данных, процессорами машинного обучения (ML) и процессорами векторного встраивания. Конвейеры рекомендуются для простой предварительной обработки данных и небольших наборов данных.
Data Prepper рекомендуется использовать для любых задач по обработке данных, которые он поддерживает, особенно при работе с большими наборами данных и сложными требованиями к предварительной обработке данных. Он упрощает процесс передачи и извлечения больших наборов данных, предоставляя широкие возможности для сложных операций по подготовке и преобразованию данных.
Примечание – Конвейерами Подсистемы можно управлять только с помощью операций Ingest API.
Предварительные условия
Предварительные условия для использования встроенных конвейеров:
- при использовании обработки в рабочей среде кластер должен содержать хотя бы один узел с разрешением на роли узлов, установленным на ingest;
- если плагин Security включен, должно быть разрешение cluster_manage_pipelines на управление конвейерами обработки.
Определение конвейера
Определение конвейера описывает последовательность конвейера обработки данных и может быть записано в формате JSON. Конвейер обработки данных состоит из следующих этапов:
{
"description" : "..."
"processors" : [...]
}
Параметры тела запроса приведены в таблице 45.
Создание конвейера
Для создания конвейера или обновления используют операцию API. Следует обратить внимание, что конвейер требует определения хотя бы одного процессора, который указывает, как изменять документы.
В следующей команде нужно заменить
PUT _ingest/pipeline/<pipeline-id>
Ниже приведен пример запроса в формате JSON, который создает конвейер приема с двумя процессорами set и процессором uppercase. Первый процессор set устанавливает grad_year в значение 2023, а второй процессор set устанавливает graduated в значение true. Процессор uppercase преобразует поле name в верхний регистр.
PUT _ingest/pipeline/my-pipeline
{
"description": "This pipeline processes student data",
"processors": [
{
"set": {
"description": "Sets the graduation year to 2023",
"field": "grad_year",
"value": 2023
}
},
{
"set": {
"description": "Sets graduated to true",
"field": "graduated",
"value": true
}
},
{
"uppercase": {
"field": "name"
}
}
]
}
В таблице 46 перечислены поля тела запроса, используемые для создания или обновления конвейера.
В таблице 47 перечислены параметры пути.
В таблице 48 перечислены параметры запроса.
Некоторые параметры процессора поддерживают фрагменты шаблонов Mustache. Чтобы получить значение поля, нужно заключить имя поля в три фигурные скобки, например }.
Имитация конвейера
Для запуска или тестирования конвейера используют операцию simulate.
Следующие запросы имитируют последний созданный входной конвейер:
GET _ingest/pipeline/_simulate
POST _ingest/pipeline/_simulate
Следующие запросы имитируют единый конвейер на основе идентификатора конвейера:
GET _ingest/pipeline/<pipeline-id>/_simulate
POST _ingest/pipeline/<pipeline-id>/_simulate
В таблице 49 перечислены поля тела запроса, используемые для запуска конвейера.
Поле docs может включать подполя, перечисленные в таблице 50.
В таблице 51 перечислены параметры запроса для запуска конвейера.
Получение информации о конвейере
Для получения всей информации о конвейере используют операцию API.
Следующий пример запроса возвращает информацию обо всех конвейерах:
GET _ingest/pipeline/
Следующий пример запроса возвращает информацию о конкретном конвейере, в данном примере это my-pipeline:
GET _ingest/pipeline/my-pipeline
Ответ содержит информацию о конвейере:
{
"my-pipeline": {
"description": "This pipeline processes student data",
"processors": [
{
"set": {
"description": "Sets the graduation year to 2023",
"field": "grad_year",
"value": 2023
}
},
{
"set": {
"description": "Sets graduated to true",
"field": "graduated",
"value": true
}
},
{
"uppercase": {
"field": "name"
}
}
]
}
}
Удаление конвейера
Чтобы удалить конкретный конвейер, передают идентификатор конвейера в качестве параметра в запросе:
DELETE /_ingest/pipeline/<pipeline-id>
Чтобы удалить все конвейеры в кластере, используют подстановочный знак "*":
DELETE /_ingest/pipeline/*
Устранение неисправностей конвейера
Каждый конвейер обработки состоит из ряда процессоров, которые последовательно применяются к документам. Если один из процессоров выйдет из строя, весь конвейер выйдет из строя. Существуют два варианта обработки сбоев:
- Сбой всего конвейера – если процессор выйдет из строя, весь конвейер выйдет из строя, и документ не будет проиндексирован.
- Завершение работы текущего процессора и переход к следующему – это может быть полезно, если требуется продолжить обработку документа, даже если один из процессоров выйдет из строя.
По умолчанию конвейер обработки останавливается, если один из его процессоров выходит из строя. Если требуется, чтобы конвейер продолжал работать при сбое процессора, можно установить для параметра ignore_failure этого процессора значение true при создании конвейера:
PUT _ingest/pipeline/my-pipeline/
{
"description": "Rename 'provider' field to 'cloud.provider'",
"processors": [
{
"rename": {
"field": "provider",
"target_field": "cloud.provider",
"ignore_failure": true
}
}
]
}
Можно указать параметр on_failure для запуска сразу после сбоя процессора. Если указан on_failure, Подсистема запустит другие процессоры в конвейере, даже если конфигурация on_failure пуста:
PUT _ingest/pipeline/my-pipeline/
{
"description": "Add timestamp to the document",
"processors": [
{
"date": {
"field": "timestamp_field",
"formats": ["yyyy-MM-dd HH:mm:ss"],
"target_field": "@timestamp",
"on_failure": [
{
"set": {
"field": "ingest_error",
"value": "failed"
}
}
]
}
}
]
}
Если процессор выходит из строя, Подсистема регистрирует сбой и продолжает работу всех остальных процессоров в конвейере поиска. Чтобы проверить, были ли сбои, можно использовать показатели конвейера обработки.
Чтобы просмотреть показатели конвейера обработки, используют Nodes Stats API:
GET /_nodes/stats/ingest?filter_path=nodes.*.ingest
Ответ содержит статистику для всех конвейеров, например:
{
"nodes": {
"iFPgpdjPQ-uzTdyPLwQVnQ": {
"ingest": {
"total": {
"count": 28,
"time_in_millis": 82,
"current": 0,
"failed": 9
},
"pipelines": {
"user-behavior": {
"count": 5,
"time_in_millis": 0,
"current": 0,
"failed": 0,
"processors": [
{
"append": {
"type": "append",
"stats": {
"count": 5,
"time_in_millis": 0,
"current": 0,
"failed": 0
}
}
}
]
},
"remove_ip": {
"count": 5,
"time_in_millis": 9,
"current": 0,
"failed": 2,
"processors": [
{
"remove": {
"type": "remove",
"stats": {
"count": 5,
"time_in_millis": 8,
"current": 0,
"failed": 2
}
}
}
]
}
}
}
}
}
}
При устранении неполадок в конвейере обработки данных первое, что следует сделать, – это проверить журналы на наличие ошибок или предупреждений, которые помогут определить причину сбоя. Журналы Подсистемы содержат информацию о конвейере обработки данных, который вышел из строя, в том числе о вышедшем из строя процессоре и причине сбоя.
Процессоры обработки
Процессоры обработки данных являются основным компонентом конвейеров обработки данных. Они выполняют предварительную обработку документов перед индексированием. Например, можно удалять поля, извлекать значения из текста, преобразовывать форматы данных или добавлять дополнительную информацию.
Подсистема предоставляет стандартный набор процессоров данных в рамках текущей установки. Чтобы просмотреть список доступных в Подсистеме процессоров, используют операцию Nodes Info API:
GET /_nodes/ingest?filter_path=nodes.*.ingest.processors
Чтобы настроить и развернуть процессоры, следует убедиться, что имеются необходимые разрешения и права доступа.
Типы процессоров и их обязательные или необязательные параметры зависят от конкретного сценария использования. Подсистема поддерживает процессоры для обработки данных, перечисленные в таблице 52.
Настройки ограничений процессоров
Можно ограничить количество процессоров для обработки данных с помощью настройки кластера cluster.ingest.max_number_processors. Общее количество процессоров включает как количество процессоров, так и количество процессоров on_failure.
Значение по умолчанию для cluster.ingest.max_number_processors — Integer.MAX_VALUE. При добавлении большего количества процессоров, чем указано в cluster.ingest.max_number_processors, возникнет ошибка IllegalStateException.
Процессоры с поддержкой пакетной обработки
Некоторые процессоры поддерживают пакетную обработку — они могут обрабатывать несколько документов одновременно в пакетном режиме. Такие процессоры обычно обеспечивают более высокую производительность при пакетной обработке. Для пакетной обработки используют Bulk API и указывают параметр batch_size. Все процессоры с поддержкой пакетной обработки имеют пакетный режим и режим обработки одного документа. При обработке документов с помощью метода PUT процессор работает в режиме обработки одного документа и обрабатывает документы последовательно. В настоящее время только процессоры text_embedding и sparse_encoding поддерживают пакетную обработку. Все остальные процессоры обрабатывают документы по одному за раз.
Выборочное включение процессоров
Процессоры, определенные в модуле ingest-common, можно выборочно включить, указав параметр ingest-common.processors.allowed кластера. Если параметр не указан, то по умолчанию включены все процессоры. Указание пустого списка отключает все процессоры. Если параметр изменен для удаления ранее включенных процессоров, то любой конвейер, использующий отключенный процессор, выйдет из строя после перезапуска узла, когда вступит в силу новый параметр.
Описание процессоров
Ниже приведено описание процессоров.
Процессор добавления append (add_entries – для Data Prepper) –используется для добавления значений в поле:
- если поле является массивом, добавляет указанные значения в этот массив;
- если поле является скалярным, преобразует его в массив и добавляет в этот массив указанные значения;
- если поле не существует, создает массив с указанными значениями.
Синтаксис:
{
"append": {
"field": "your_target_field",
"value": ["your_appended_value"]
}
}
В таблице 53 перечислены обязательные и необязательные параметры для процессора append.
Процессор байтов bytes – преобразует значение байта, понятное человеку, в эквивалентное значение в байтах. Поле может быть скалярным или массивом. Если поле является скалярным, значение преобразуется и сохраняется в поле. Если поле является массивом, преобразуются все значения массива.
Синтаксис
{
"bytes": {
"field": "your_field_name"
}
}
В таблице 54 перечислены обязательные и необязательные параметры для процессора bytes.
Процессор конвертации convert (convert_entry_type – для Data Prepper) –преобразует поле в документе в другой тип, например, строку в целое число или целое число в строку. Для поля массива преобразуются все значения в массиве.
Синтаксис:
{
"convert": {
"field": "field_name",
"type": "type-value"
}
}
В таблице 55 перечислены обязательные и необязательные параметры для процессора convert.
Процессор копирования copy (copy_values – для Data Prepper) – копирует весь объект из существующего поля в другое поле.
Синтаксис:
{
"copy": {
"source_field": "source_field",
"target_field": "target_field",
"ignore_missing": true,
"override_target": true,
"remove_source": true
}
}
В таблице 56 перечислены обязательные и необязательные параметры для процессора copy.
CSV-процессор csv (csv – для Data Prepper) – используется для анализа CSV-файлов и сохранения их в виде отдельных полей в документе. Процессор игнорирует пустые поля.
Синтаксис:
{
"csv": {
"field": "field_name",
"target_fields": ["field1, field2, ..."]
}
}
В таблице 57 перечислены обязательные и необязательные параметры для процессора csv.
Процессор даты date (для Data Prepper – date) – используется для анализа дат в полях документа и добавления проанализированных данных в новое поле. По умолчанию проанализированные данные сохраняются в поле @timestamp.
Синтаксис:
{
"date": {
"field": "date_field",
"formats": ["yyyy-MM-dd'T'HH:mm:ss.SSSZZ"]
}
}
В таблице 58 перечислены обязательные и необязательные параметры для процессора date.
Процессор community_id – используется для создания хеша сетевого потока идентификатора сообщества. Алгоритм создания хеша потока идентификатора сообщества определяется в спецификации идентификатора сообщества. Сгенерированное процессором значение хеша можно использовать для сопоставления всех связанных сетевых событий, чтобы отфильтровать данные сетевого потока по значению хеша или сгенерировать статистику, объединив данные по полю хеша. Процессор поддерживает сетевые протоколы TCP, UDP, SCTP, ICMP и IPv6-ICMP. Для создания значения хеша используется алгоритм SHA-1.
Синтаксис:
{
"community_id": {
"source_ip_field": "source_ip",
"source_port_field": "source_port",
"destination_ip_field": "destination_ip",
"destination_port_field": "destination_port",
"iana_protocol_number_field": "iana_protocol_number",
"source_port_field": "source_port",
"target_field": "community_id"
}
}
В таблице 59 перечислены обязательные и необязательные параметры для процессора community_id.
Процессор имени индекса даты date_index_name – используется для указания документов на правильный индекс на основе времени в соответствии с полем даты или метки времени в документе. Процессор устанавливает для поля метаданных _index выражение математического индекса даты. Затем процессор извлекает дату или метку времени из поля field в обрабатываемом документе и форматирует ее в выражение математического индекса даты. Извлеченная дата, значение index_name_prefix и значение date_rounding затем объединяются для создания выражения математического индекса даты. Например, если поле field содержит значение 2023-10-30T12:43:29.000Z и index_name_prefix установлено на week_index- и date_rounding установлено на "w", то выражение математического индекса даты будет week_index-2023-10-30. Можно использовать поле date_formats для указания формата даты в выражении математического индекса даты.
Синтаксис:
{
"date_index_name": {
"field": "your_date_field or your_timestamp_field",
"date_rounding": "rounding_value"
}
}
В таблице 60 перечислены обязательные и необязательные параметры для процессора date_index_name.
Процессор извлечения dissect (для Data Prepper – dissect) – извлекает значения из текстового поля документа и сопоставляет их с отдельными полями на основе шаблонов разделения. Процессор хорошо подходит для извлечения полей из сообщений журнала с известной структурой. В отличие от процессора grok процессор dissect не использует регулярные выражения и имеет более простой синтаксис.
Синтаксис:
{
"dissect": {
"field": "source_field",
"pattern": "%{dissect_pattern}"
}
}
В таблице 61 перечислены обязательные и необязательные параметры для процессора dissect.
Процессор расширения точек dot_expander — это инструмент, который помогает работать с иерархическими данными. Он преобразует поля, содержащие точки, в поля объектов, делая их доступными для других процессоров в конвейере. Без этого преобразования поля с точками не могут быть обработаны.
Синтаксис:
{
"dot_expander": {
"field": "field.to.expand"
}
}
В таблице 62 перечислены обязательные и необязательные параметры для процессора dot_expander.
Процессор сброса drop (для Data Prepper – drop_events) – используется для удаления документов без их индексации. Это может быть полезно для предотвращения индексации документов на основе определенных условий. Например, можно использовать процессор drop для предотвращения индексации документов, в которых отсутствуют важные поля или содержится конфиденциальная информация.
Процессор drop не выдает никаких ошибок при удалении документов, что делает его полезным для предотвращения проблем с индексированием без засорения журналов Подсистемы сообщениями об ошибках.
Синтаксис:
{
"drop": {
"if": "ctx.foo == 'bar'"
}
}
В таблице 63 перечислены обязательные и необязательные параметры для процессора drop.
Процессор прерывания fail – полезен для преобразования и добавления данных в процессе индексирования. Основное назначение процессора — прерывать операцию индексирования при выполнении определенных условий.
Синтаксис:
"fail": {
"if": "ctx.foo == 'bar'",
"message": "Custom error message"
}
В таблице 64 перечислены обязательные и необязательные параметры для процессора fail.
Процессор создания хеш-значений fingerprint – используется для создания хеш-значения для определенных заданных полей или для всех полей в документе. Хеш-значение можно использовать для удаления дубликатов документов в индексе и сокращения результатов поиска.
Для каждого поля значение имени, длина значения и само значение объединяются и разделяются символом конвейера "|". Например, если имя поля равно field1, а значение равно value1, то объединенная строка будет |field1|3:value1|field2|10:value2|. Для полей объектов имя поля сглаживается путем соединения имен вложенных полей точкой ".". Например, если поле объекта является root_field с подполем sub_field1, имеющим значение value1, и другим подполем sub_field2, имеющим значение value2, то объединенная строка будет таковой |root_field.sub_field1|1:value1|root_field.sub_field2|100:value2|.
Синтаксис:
{
"community_id": {
"fields": ["foo", "bar"],
"target_field": "fingerprint",
"hash_method": "SHA-1@2.16.0"
}
}
В таблице 65 перечислены обязательные и необязательные параметры для процессора fingerprint.
Процессор перебора foreach – используется для перебора списка значений во входном документе и применения преобразования к каждому значению. Это может быть полезно для таких задач, как последовательная обработка всех элементов в массиве, например, преобразование всех элементов строки в нижний или верхний регистр.
Синтаксис:
{
"foreach": {
"field": "<field_name>",
"processor": {
"<processor_type>": {
"<processor_config>": "<processor_value>"
}
}
}
}
В таблице 66 перечислены обязательные и необязательные параметры для процессора foreach.
Процессор gsub – выполняет поиск и замену по регулярному выражению в строковых полях входящих документов. Если поле содержит массив строк, операция применяется ко всем элементам массива. Однако если поле содержит не строковые значения, процессор генерирует исключение. Примеры использования процессора gsub включают удаление конфиденциальной информации из сообщений журнала или пользовательского контента, нормализацию форматов или соглашений о данных (например, преобразование форматов дат, удаление специальных символов), а также извлечение или преобразование подстрок из значений полей для дальнейшей обработки или анализа.
Синтаксис:
"gsub": {
"field": "field_name",
"pattern": "regex_pattern",
"replacement": "replacement_string"
}
В таблице 67 перечислены обязательные и необязательные параметры для процессора gsub.
Процессор ip2geo – добавляет информацию о географическом местоположении IPv4- или IPv6-адреса. Процессор ip2geo использует данные геолокации IP (GeoIP) из внешней endpoint и поэтому требует наличия дополнительного компонента datasource, который определяет, откуда загружать данные GeoIP и как часто их обновлять.
Примечание – Процессор ip2geo сохраняет сопоставление данных GeoIP в системных индексах. Сопоставление GeoIP извлекается из этих индексов во время приема данных для преобразования IP-адреса в геолокацию для входящих данных. Для оптимальной производительности предпочтительно иметь узел с ролями приема и хранения данных, так как эта конфигурация позволяет избежать вызовов между узлами, что снижает задержку. Кроме того, поскольку процессор ip2geo ищет данные сопоставления GeoIP в индексах, это влияет на производительность поиска.
Чтобы начать работу с процессором ip2geo, необходимо установить плагин opensearch-geospatial.
Источник данных IP2Geo и настройки узла процессора перечислены в таблице 68. Все настройки в этой таблице являются динамическими.
Перед созданием конвейера, в котором используется процессор ip2geo, нужно создать источник данных IP2Geo. Источник данных определяет значение endpoint, с которой будут загружаться данные GeoIP, и указывает интервал обновления.
Подсистема предоставляет следующие конечные точки для баз данных GeoLite2 City, GeoLite2 Country и GeoLite2 ASN от MaxMind, которые распространяются по лицензии CC BY-SA 4.0:
- GeoLite2 City – https://geoip.maps.opensearch.org/v1/geolite2-city/manifest.json;
- Страна GeoLite2 – https://geoip.maps.opensearch.org/v1/geolite2-country/manifest.json;
- GeoLite2 ASN – https://geoip.maps.opensearch.org/v1/geolite2-asn/manifest.json.
Если кластер Подсистемы не может обновить источник данных с конечных точек в течение 30 дней, кластер не добавляет данные GeoIP в документы, а вместо этого добавляет "error":"ip2geo_data_expired".
В таблице 69 перечислены параметры источника данных для процессора ip2geo.
Чтобы создать источник данных IP2Geo, выполняют следующий запрос:
PUT /_plugins/geospatial/ip2geo/datasource/my-datasource
{
"endpoint" : "https://geoip.maps.opensearch.org/v1/geolite2-city/manifest.json",
"update_interval_in_days" : 3
}
Ответ true означает, что запрос был успешным и сервер смог его обработать. Ответ false указывает на то, что следует проверить запрос, чтобы убедиться, что он корректен, проверить URL-адрес, чтобы убедиться, что он правильный, или повторить попытку.
Чтобы обновить источник даты, выполняют следующий запрос:
PUT /_plugins/geospatial/ip2geo/datasource/my-datasource/_settings
{
"endpoint": https://geoip.maps.opensearch.org/v1/geolite2-city/manifest.json,
"update_interval_in_days": 10
}
Чтобы удалить источник данных IP2Geo, сначала необходимо удалить все процессоры, связанные с источником данных. В противном случае запрос не будет выполнен.
Чтобы удалить источник данных, выполняют следующий запрос:
DELETE /_plugins/geospatial/ip2geo/datasource/my-datasource
После создания источника данных, можно создать конвейер:
{
"ip2geo": {
"field":"ip",
"datasource":"my-datasource"
}
}
В таблице 70 перечислены обязательные и необязательные параметры для процессора ip2geo.
Процессор Grok (для Data Prepper – grok) – используется для анализа и структурирования неструктурированных данных с помощью сопоставления шаблонов. С помощью процессора grok можно извлекать поля из сообщений журналов, журналов доступа веб-серверов, журналов приложений и других данных журналов, которые соответствуют единому формату.
Процессор grok использует набор предопределенных шаблонов для сопоставления частей входного текста. Каждый шаблон состоит из имени и регулярного выражения. Например, шаблон %{IP:ip_address} сопоставляет IP-адрес и присваивает его полю ip_address. Можно комбинировать несколько шаблонов для создания более сложных выражений. Например, шаблон %{IP:client} %{WORD:method} %{URIPATHPARM:request} %{NUMBER:bytes %NUMBER:duration} сопоставляет строку из журнала доступа веб-сервера и извлекает IP-адрес клиента, метод HTTP, URI запроса, количество отправленных байтов и продолжительность запроса.
Процессор grok построен на библиотеке регулярных выражений Oniguruma и поддерживает все шаблоны из этой библиотеки. Можно использовать инструмент Grok Debugger для тестирования и отладки grok-выражений.
Синтаксис:
{
"grok": {
"field": "your_message",
"patterns": ["your_patterns"]
}
}
Для настройки процессора grok есть различные параметры, которые позволяют задавать шаблоны, сопоставлять определенные ключи и управлять поведением процессора. В таблице 71 перечислены обязательные и дополнительные параметры процессора grok.
Можно использовать шаблоны по умолчанию или добавить собственные шаблоны в свои конвейеры с помощью параметра patterns_definitions. Собственные шаблоны grok можно использовать в конвейере для извлечения структурированных данных из сообщений журнала, которые не соответствуют встроенным шаблонам grok. Это может быть полезно для анализа сообщений журнала из пользовательских приложений или для анализа сообщений журнала, которые были каким-либо образом изменены. Собственные шаблоны имеют простую структуру: каждый шаблон имеет уникальное имя и соответствующее регулярное выражение, определяющее его поведение при сопоставлении.
Процессор HTML-тегов html_strip – удаляет HTML-теги из строковых полей во входящих документах. Этот процессор полезен при индексировании данных с веб-страниц или из других источников, которые могут содержать HTML-разметку. HTML-теги заменяются символами новой строки "\n".
Синтаксис:
{
"html_strip": {
"field": "webpage"
}
}
В таблице 72 перечислены обязательные и необязательные параметры для процессора html_strip.
Процессор объединения join – объединяет элементы массива в одно строковое значение, используя указанный разделитель между каждым элементом. Он генерирует исключение, если предоставленные входные данные не являются массивом.
Синтаксис:
{
"join": {
"field": "field_name",
"separator": "separator_string"
}
}
В таблице 73 перечислены обязательные и необязательные параметры для процессора join.
Процессор JSON – преобразует строковое значение в карту карт, которая может быть полезна для различных задач по обработке и добавлению данных.
Синтаксис:
{
"processor": {
"json": {
"field": "<field_name>",
"target_field": "<target_field_name>",
"add_to_root": <boolean>
}
}
}
В таблице 74 перечислены обязательные и необязательные параметры для процессора JSON.
Процессор kv (для Data Prepper – key_value) – автоматически извлекает определенные поля событий или сообщения в формате key=value. Этот структурированный формат упорядочивает данные, группируя их по ключам и значениям. Он полезен для анализа, визуализации и использования данных, например для анализа поведения пользователей, оптимизации производительности или расследований в сфере безопасности.
Синтаксис:
{
"kv": {
"field": "message",
"field_split": " ",
"value_split": " "
}
}
В таблице 75 перечислены обязательные и необязательные параметры для процессора kv.
Процессор строчных букв lowercase (для Data Prepper – lowercase_string) – преобразует весь текст в определенном поле в строчные буквы.
Синтаксис:
{
"lowercase": {
"field": "field_name"
}
}
В таблице 76 перечислены обязательные и необязательные параметры для процессора lowercase.
Процессор вызова ML ml_inference – используется для вызова моделей машинного обучения (ML), зарегистрированных в плагине ML Commons. Выходные данные модели добавляются в качестве новых полей в обрабатываемые документы.
Следует обратить внимание, что перед использованием процессора ml_inference должна быть сформирована либо локальная модель машинного обучения, размещенная в кластере Подсистемы, либо модель, размещенная на внешнем сервере и подключенная к кластеру с помощью плагина ML Commons.
Синтаксис:
{
"ml_inference": {
"model_id": "<model_id>",
"function_name": "<function_name>",
"full_response_path": "<full_response_path>",
"model_config":{
"<model_config_field>": "<config_value>"
},
"model_input": "<model_input>",
"input_map": [
{
"<model_input_field>": "<document_field>"
}
],
"output_map": [
{
"<new_document_field>": "<model_output_field>"
}
],
"override": "<override>"
}
}
Процессор конвейера pipeline – позволяет конвейеру ссылаться на другой предопределенный конвейер и включать его в свой состав. Это может быть полезно, если есть набор общих процессоров, которые необходимо использовать в нескольких конвейерах. Вместо того чтобы переопределять эти общие процессоры в каждом конвейере, можно создать отдельный базовый конвейер, содержащий общие процессоры, а затем ссылаться на этот базовый конвейер из других конвейеров с помощью процессора конвейера.
Синтаксис:
{
"pipeline": {
"name": "general-pipeline"
}
}
В таблице 77 перечислены обязательные и необязательные параметры для процессора pipeline.
Процессор remove_by_pattern – удаляет поля корневого уровня из документа с помощью заданных шаблонов подстановочных знаков.
Синтаксис:
{
"remove_by_pattern": {
"field_pattern": "field_name_prefix*"
}
}
В таблице 78 перечислены обязательные и необязательные параметры для процессора remove_by_pattern.
Процессор удаления remove – используется для удаления поля из документа.
Синтаксис:
{
"remove": {
"field": "field_name"
}
}
В таблице 79 перечислены обязательные и необязательные параметры для процессора remove.
Процессор переименования rename – используется для переименования существующего поля, а также для перемещения поля из одного объекта в другой или на корневой уровень.
Синтаксис:
{
"rename": {
"field": "field_name",
"target_field" : "target_field_name"
}
}
В таблице 80 перечислены обязательные и необязательные параметры для процессора rename.
Процессор сценариев script – выполняет встроенные и сохраненные скрипты, которые могут изменять или преобразовывать данные в документе в процессе обработки. Процессор использует кеширование скриптов для повышения производительности, поскольку скрипты могут перекомпилироваться для каждого документа.
Синтаксис:
{
"processor": {
"script": {
"source": "<script_source>",
"lang": "<script_language>",
"params": {
"<param_name>": "<param_value>"
}
}
}
}
В таблице 81 перечислены обязательные и необязательные параметры для процессора script.
Процессор добавления set – добавляет или обновляет поля в документе. Он устанавливает одно поле и связывает его с указанным значением. Если поле уже существует, то его значение заменяется на указанное, если только параметр override не установлен в false. Если override равно false и указанное поле существует, то значение поля остается неизменным.
Синтаксис:
{
"description": "...",
"processors": [
{
"set": {
"field": "new_field",
"value": "some_value"
}
}
]
}
В таблице 82 перечислены обязательные и необязательные параметры для процессора set.
Процессор разреженного кодирования sparse_encoding – используется для создания разреженного вектора/токена и весовых коэффициентов из текстовых полей для нейронного разреженного поиска с использованием разреженного поиска.
Следует обратить внимание, что перед использованием процессора sparse_encoding необходимо настроить модель машинного обучения (ML).
Синтаксис:
{
"sparse_encoding": {
"model_id": "<model_id>",
"field_map": {
"<input_field>": "<vector_field>"
}
}
}
В таблице 83 перечислены обязательные и необязательные параметры для процессора sparse_encoding.
Процессор сортировки sort – сортирует массив элементов в порядке возрастания или убывания. Числовые массивы сортируются по числовому значению, а строковые или смешанные массивы (строки и числа) сортируются лексикографически. Если входные данные не являются массивом, процессор выдает ошибку.
Синтаксис:
{
"description": "Sort an array of items",
"processors": [
{
"sort": {
"field": "my_array_field",
"order": "desc"
}
}
]
}
В таблице 84 перечислены обязательные и необязательные параметры для процессора sort.
Процессор разбиения текста на фрагменты text_chunking – разбивает длинный документ на более короткие фрагменты. Процессор поддерживает следующие алгоритмы разбиения текста:
- fixed_token_length – разбивает текст на фрагменты указанного размера;
- delimiter – разбивает текст на отрывки с помощью разделителя.
Синтаксис:
{
"text_chunking": {
"field_map": {
"<input_field>": "<output_field>"
},
"algorithm": {
"<name>": "<parameters>"
}
}
}
В таблице 85 перечислены обязательные и необязательные параметры для процессора text_chunking.
Примечание – Чтобы выполнить группировку вложенных полей, указать значения input_field и output_field в виде объектов JSON. Дочерние пути вложенных полей не поддерживаются. Например, используют "field_map": { "foo": { "bar": "bar_chunk"} } вместо "field_map": { "foo.bar": "foo.bar_chunk"}.
В таблице 86 перечислены необязательные параметры для алгоритма фиксированной длины токена fixed_token_length.
Примечания – Значение по умолчанию для token_limit равно 384, чтобы выходные данные не превышали ограничение на количество токенов в моделях встраивания текста. Для предварительно обученных моделей, поддерживаемых Подсистемой, таких как msmarco-distilbert-base-tas-b и opensearch-neural-sparse-encoding-v1, ограничение на количество входных токенов равно 512. Токенизатор standard разбивает текст на слова. Согласно OpenAI, 1 токен равен примерно 0,75 слова в английском тексте. Ограничение на количество токенов по умолчанию рассчитывается как 512 * 0,75 = 384.
Можно установить значение overlap_rate в диапазоне от 0 до 0,5 включительно. Для Amazon Bedrock рекомендуется устанавливать значение этого параметра от 0 до 0,2 для повышения точности.
Параметр max_chunk_limit ограничивает количество фрагментов. Если количество фрагментов, сгенерированных процессором, превышает лимит, алгоритм выдаст исключение и предложит увеличить или отключить лимит.
В таблице 87 перечислены необязательные параметры для алгоритма определения разделителя delimiter.
Примечание – Параметр max_chunk_limit ограничивает количество фрагментов. Если количество фрагментов, сгенерированных процессором, превышает лимит, алгоритм выдаст исключение и предложит увеличить или отключить лимит.
Процессор для встраивания текста text_embedding – используется для создания векторных представлений текстовых полей для семантического поиска.
Следует обратить внимание, что перед использованием процессора text_embedding необходимо настроить модель машинного обучения (ML).
Синтаксис:
{
"text_embedding": {
"model_id": "<model_id>",
"field_map": {
"<input_field>": "<vector_field>"
}
}
}
В таблице 88 перечислены обязательные и необязательные параметры для процессора text_embedding.
Процессор разделения split используется для разделения строкового поля на массив подстрок на основе заданного разделителя.
Синтаксис:
{
"split": {
"field": "field_to_split",
"separator": "<delimiter>",
"target_field": "split_field"
}
}
В таблице 89 перечислены обязательные и необязательные параметры для процессора split.
Процессор встраивания текста/изображений text_image_embedding используется для создания комбинированных векторных представлений из текстовых и графических полей для мультимодального нейронного поиска.
Следует обратить внимание, что перед использованием процессора text_image_embedding необходимо настроить модель машинного обучения (ML).
Синтаксис:
{
"text_image_embedding": {
"model_id": "<model_id>",
"embedding": "<vector_field>",
"field_map": {
"text": "<input_text_field>",
"image": "<input_image_field>"
}
}
}
В таблице 90 перечислены обязательные и необязательные параметры для процессора text_image_embedding.
Процессор обрезки trim используется для удаления начальных и конечных пробелов из указанного поля.
Синтаксис:
{
"trim": {
"field": "field_to_trim",
"target_field": "trimmed_field"
}
}
В таблице 91 перечислены обязательные и необязательные параметры для процессора trim.
Процессор заглавных букв uppercase (для Data Prepper – uppercase_string) – преобразует весь текст в определенном поле в заглавные буквы.
Синтаксис:
{
"uppercase": {
"field": "field_name"
}
}
В таблице 92 перечислены обязательные и необязательные параметры для процессора uppercase.
Процессор декодирования URL-адресов urldecode – полезен для декодирования строк в кодировке URL в данных журнала или других текстовых полях. Это позволяет сделать данные более читаемыми и удобными для анализа, особенно при работе с URL-адресами или параметрами запроса, содержащими специальные символы или пробелы.
Синтаксис:
{
"urldecode": {
"field": "field_to_decode",
"target_field": "decoded_field"
}
}
В таблице 93 перечислены обязательные и необязательные параметры для процессора urldecode.
Процессор пользовательского агента user_agent – используется для извлечения информации из строки пользовательского агента, такой как браузер, устройство и операционная система, используемые клиентом. Процессор user_agent особенно полезен для анализа поведения пользователей и выявления тенденций на основе пользовательских устройств, операционных систем и браузеров. Он также может быть полезен для устранения неполадок, характерных для определенных конфигураций пользовательских агентов.
Синтаксис:
{
"processor": {
"user_agent": {
"field": "user_agent",
"target_field": "user_agent_info"
}
}
}
В таблице 94 перечислены обязательные и необязательные параметры для процессора user_agent.