env: #gateway GATEWAY_HOST: 0.0.0.0:18000 GATEWAY_TLS_ENABLED: false ENFORCE_BULK_REQUESTS_ONLY: false #when this set to `true`, index delete, reindex, update_by_query, delete_by_query are not allowed. #partition REPLICATION_PARTITION_SIZE: 1 #write ahead log partition size REQUEST_RESHUFFLE_PARTITION_SIZE: 1 #none-bulk requests partition size BULK_RESHUFFLE_PARTITION_SIZE: 1 # bulk requests partition size BULK_RESHUFFLE_LEVEL: "cluster" #cluster,node,index,shard #primary PRIMARY_ENDPOINTS: [ 'http://localhost:9200' ] PRIMARY_USERNAME: elastic PRIMARY_PASSWORD: elastic PRIMARY_MAX_QPS_PER_NODE: 10000 PRIMARY_MAX_BYTES_PER_NODE: 104857600 #100MB/s PRIMARY_MAX_CONNECTION_PER_NODE: 200 PRIMARY_DISCOVERY_ENABLED: false PRIMARY_DISCOVERY_REFRESH_ENABLED: false #backup BACKUP_ENDPOINTS: [ 'http://192.168.3.185:17901' ] BACKUP_USERNAME: elastic BACKUP_PASSWORD: elastic BACKUP_MAX_QPS_PER_NODE: 10000 BACKUP_MAX_BYTES_PER_NODE: 104857600 #100MB/s BACKUP_MAX_CONNECTION_PER_NODE: 200 BACKUP_DISCOVERY_ENABLED: false BACKUP_DISCOVERY_REFRESH_ENABLED: false #throttle THROTTLE_BULK_INDEXING_MAX_BYTES: 40485760 #40MB/s THROTTLE_BULK_INDEXING_MAX_REQUESTS: 40000 #20k docs/s THROTTLE_BULK_INDEXING_ACTION: retry #retry,drop THROTTLE_BULK_INDEXING_MAX_RETRY_TIMES: 10 #1000 THROTTLE_BULK_INDEXING_RETRY_DELAY_IN_MS: 100 #10 #metadata & metrics & logging LOGGING_ENDPOINTS: [ 'http://localhost:9200' ] LOGGING_USERNAME: admin LOGGING_PASSWORD: admin LOGGING_MAX_QPS_PER_NODE: 10000 LOGGING_MAX_BYTES_PER_NODE: 104857600 #100MB/s LOGGING_MAX_CONNECTION_PER_NODE: 200 LOGGING_DISCOVERY_ENABLED: false LOGGING_DISCOVERY_REFRESH_ENABLED: false #kafka KAFKA_HOSTS: [ '192.168.3.185:9092','192.168.3.185:9093' ,'192.168.3.185:9094' ] #api API_HOST: 0.0.0.0:12900 #disk_queue DISK_QUEUE_COMPRESS_SEGMENT_ENABLED: true DISK_QUEUE_RETENTION_MAX_NUM_OF_LOCAL_FILES: 30 DISK_QUEUE_UPLOAD_TO_S3: false #s3 S3_SERVER_NAME: mys3 S3_LOCATION: beijing S3_HOST: "192.168.3.188:9000" S3_BUCKET_NAME: "infini-store" S3_ACCESS_KEY: "key" S3_ACCESS_SECRET: "secret" ELASTIC_HEALTH_CHECK_ENABLED: true ELASTIC_AVAILABILITY_CHECK_ENABLED: true ELASTIC_METADATA_REFRESH_ENABLED: true ELASTIC_CLUSTER_SETTINGS_CHECK_ENABLED: false #metrics METRICS_ENABLED: true #floating_ip FLOATING_IP_ENABLED: false FLOATING_IP_ADDRESS: 192.168.3.1 FLOATING_IP_MASK: 255.255.255.0 FLOATING_IP_INTERFACE: eth1 #configs path.data: data path.logs: log api: enabled: true network: binding: $[[env.API_HOST]] entry: - name: my_es_entry enabled: true router: my_router max_concurrency: 10000 network: binding: $[[env.GATEWAY_HOST]] reuse_port: true # skip_occupied_port: true tls: enabled: $[[env.GATEWAY_TLS_ENABLED]] flow: - name: auth-flow filter: # - basic_auth: # valid_users: # ingest: managemDFSDFSDFent - set_basic_auth: username: $[[env.PRIMARY_USERNAME]] password: $[[env.PRIMARY_PASSWORD]] - name: set-auth-for-backup-flow filter: - set_basic_auth: #覆盖备集群的身份信息用于备集群正常处理请求 username: $[[env.BACKUP_USERNAME]] password: $[[env.BACKUP_PASSWORD]] - name: limit_body_size filter: - if: range: _ctx.request.body_length: gte: 12800000 then: - set_response: body: '{"message":"request body is too large."}' status: 400 - echo: message: "request throttled: $[[_ctx.request.method]],$[[_ctx.remote_ip]],body_size:$[[_ctx.request.body_length]] 太大超阈值" #触发限流告警message自定义 response: false stdout: false logging: true logging_level: "warn" #debug,info,warn,error - drop: - name: limit_write_tps filter: - request_client_ip_limiter: max_requests: 25 #25 req/s status: 429 message: '{TPS超阈值}' #触发限流告警message自定义 # action: retry #排队处理 action: drop #拒绝处理 log_warn_message: true - name: limit_read_tps filter: - request_client_ip_limiter: max_requests: 10 #10 req/s status: 429 message: '{QPS超阈值}' #触发限流告警message自定义 action: drop #拒绝处理 # action: retry #排队处理 log_warn_message: true - name: bulking_indexing_limit filter: - bulk_request_throttle: indices: # es-loadgen-2022-11-*: # max_requests: 5 # action: drop # message: "es-loadgen-2022-11-* doc写入超阈5" #触发限流告警message自定义 # log_warn_message: true # es-loadgen-2022-12-25: # max_requests: 20 # action: drop # message: "es-loadgen-2022-12-25 doc写入超阈20" #触发限流告警message自定义 # log_warn_message: true # es-loadgen-2022-12-*: # max_requests: 10 # action: drop # message: "es-loadgen-2022-12-* doc写入超阈10" #触发限流告警message自定义 # log_warn_message: true "*": max_bytes: $[[env.THROTTLE_BULK_INDEXING_MAX_BYTES]] max_requests: $[[env.THROTTLE_BULK_INDEXING_MAX_REQUESTS]] action: $[[env.THROTTLE_BULK_INDEXING_ACTION]] retry_delay_in_ms: $[[env.THROTTLE_BULK_INDEXING_RETRY_DELAY_IN_MS]] max_retry_times: $[[env.THROTTLE_BULK_INDEXING_MAX_RETRY_TIMES]] message: "bulk writing too fast" #触发限流告警message自定义 log_warn_message: true - name: primary-read-flow filter: # - flow: # flows: # - auth-flow # - limit_read_tps # - limit_body_size - if: cluster_available: [ "primary" ] then: - elasticsearch: elasticsearch: "primary" refresh: enabled: true interval: 30s else: - elasticsearch: elasticsearch: "backup" refresh: enabled: true interval: 30s - name: primary-on-failure-write-flow #当主集群不可用之后的处理流程 filter: #option1: 直接丢弃请求,让客户端选择处理,或者也可落地队列确保不丢数据; - set_response: status: 503 content_type: "application/json" body: '{"error":true,"message":"503 Service Unavailable"}' - drop: #结束所有后续流程的处理 # #option2: 放入主失败队列,主备都会消费该队列 # - bulk_request_mutate: #修复自动生成 id 的文档,主动生成 id # fix_null_id: true # fix_null_type: true # default_type: _doc # # type_rename: # # "*": _doc #统一索引的 Type 类型,适合旧版本多 Type 迁移到新版本集群 # remove_pipeline: true # generate_enhanced_id: true # - queue: # queue_name: "primary-failure" # #由请求触发的限速模式下的主动检查后端监控情况 # - elasticsearch_health_check: # elasticsearch: "primary" # - set_response: # status: 200 # content_type: "application/json" # body: '{"error":false,"success":true,"acknowledged":true,"hits":{},"found":true}' # - drop: #结束所有后续流程的处理 - name: primary-write-flow #正常的主写流程 filter: # - flow: # flows: # - auth-flow # - limit_write_tps # - limit_body_size # - bulking_indexing_limit - auto_generate_doc_id: # 处理非 Bulk 请求,但是文档新增,且默认自增 ID 的情况 - if: or: # - consumer_has_lag: # queue: "primary-failure" # group: primary # name: primary-failure - not: cluster_available: [ "primary" ] then: #集群可用但是集群有堆积的情况,不处理客户端请求,待服务恢复之后再提供服务 - flow: flows: - primary-on-failure-write-flow else: # 集群不可用或者集群可用且没有堆积的情况,都直接转发给集群先处理 - bulk_request_mutate: #修复自动生成 id 的文档,主动生成 id fix_null_id: true fix_null_type: true default_type: _doc # type_rename: # "*": _doc #统一索引的 Type 类型,适合旧版本多 Type 迁移到新版本集群 remove_pipeline: true generate_enhanced_id: true - hash_mod: #hash requests to different queues source: "$[[_ctx.remote_ip]]_$[[_ctx.request.username]]_$[[_ctx.request.path]]_$[[_sys.second_of_now]]" #for debugging only, $[[_sys.second_of_now]] should be removed # source: "$[[_ctx.remote_ip]]_$[[_ctx.request.username]]_$[[_ctx.request.path]]" target_context_name: "partition_id" mod: $[[env.REPLICATION_PARTITION_SIZE]] #hash to 10 queues add_to_header: true - set_context: context: _ctx.request.header.X-Replicated-ID: $[[_util.increment_id.request_number_id]]_$[[_util.generate_uuid]] _ctx.request.header.X-Replicated-Timestamp: $[[_sys.unix_timestamp_of_now]] _ctx.request.header.X-Replicated: "true" - queue: #handle dirty_writes, pre-commit queue_name: "primary_write_ahead_log##$[[partition_id]]" save_last_produced_message_offset: true last_produced_message_offset_key: "LAST_PRODUCED_MESSAGE_OFFSET" labels: type: "primary_write_ahead_log" partition_id: "$[[partition_id]]" - elasticsearch: #集群可用,直接处理请求 elasticsearch: "primary" max_connection_per_node: 1000 max_retry_times: 0 refresh: enabled: true interval: 30s - queue: #handle dirty_writes, second-commit queue_name: "primary_first_commit_log##$[[partition_id]]" labels: type: "primary_first_commit_log" partition_id: "$[[partition_id]]" message: "$[[_ctx.request.header.X-Replicated-ID]]#$[[_ctx.request.header.LAST_PRODUCED_MESSAGE_OFFSET]]#$[[_sys.unix_timestamp_of_now]]" - bulk_response_process: #bulk 出错不继续执行后续 flow,因为成功、失败、非法的请求都已经入队,可以直接退出 # success_flow: "replicate-primary-writes-to-backup-queue" failure_queue: "primary-failure" #失败的请求,两边的集群都要做一次,避免脏数据 invalid_queue: "primary-invalid" response_handle: retry_rules: default: true retry_429: true retry_4xx: false denied: # status: [ 503 ] keyword: - "illegal_state_exception" # - "503 Service Unavailable" # - '{"error":true,"message":"timeout"}' #timout, but requests may already send to es, need to redo - flow: # 非 bulk 请求继续判断 flows: - primary-response-check - name: primary-response-check filter: - if: #不合法的请求 and: - not: in: _ctx.response.status: [ 429 ] #400_500 之间但不包括 429 - range: _ctx.response.status.gte: 400 _ctx.response.status.lt: 500 then: - queue: queue_name: "primary-invalid" - drop: - if: #正常的请求, 复制到备份集群 in: _ctx.response.status: [ 200,201 ] then: #we consume wal directly - drop: else: #集群可用的情况下但是失败了,可能存在脏写,将请求放入写入失败队列,后续可以选择两边集群都重做一次,最终确保一致性,写 translog,后续提供 UI 可以进行三方检查:主、备集群和本地日志 - queue: queue_name: "primary-failure" - name: primary-failure-primary-post-processing #主集群的故障处理,重试,处理通过 commit,处理失败重试,非法请求丢弃,失败的请求 filter: - if: not: cluster_available: [ "primary" ] then: - elasticsearch_health_check: elasticsearch: "primary" - sleep: sleep_in_million_seconds: 5000 - drop: - elasticsearch: elasticsearch: "primary" max_connection_per_node: 1000 max_retry_times: 0 refresh: enabled: true interval: 30s - if: #请求失败了,继续重试 in: _ctx.response.status: [ 429,0,500,503 ] then: - drop: - bulk_response_process: #bulk 成功执行了,非法消息都入队,其他消息都不用管,继续重试,只有全部成功或者失败的时候才 commit,请求丢给 backup,否则提前结束 failure_queue: "primary-failure" #保存用来查看消息日志 invalid_queue: "primary-invalid" #保存用来查看消息日志 tag_on_any_error: [ "commit_message_allowed" ] #任务错误都 commit,因为可以重试的我们重新扔到failure_queue里面了。 tag_on_all_invalid: [ "commit_message_allowed" ] #只有都是非法请求的情况下才 commit tag_on_all_success: [ "commit_message_allowed" ] #只有都是成功的情况下才 commit continue_on_all_error: true #bulk 请求整体响应不是200,继续交由后面的 filter 进行处理 continue_on_any_error: true #里面有部分出错,可能是全部失败,可能部分失败 retry_rules: default: true retry_429: true #429 的错误必须重试 retry_4xx: false denied: status: [ ] keyword: - "illegal_state_exception" - if: #其他的非 bulk 请求处理,先处理不合法的请求,主集群都失败了,副集群就不用考虑了 and: - not: in: _ctx.response.status: [ 429 ] #400_500 之间但不包括 429 - range: _ctx.response.status.gte: 400 _ctx.response.status.lt: 500 then: - tag: add: [ "commit_message_allowed" ] - queue: queue_name: "primary-invalid" - drop: - if: in: _ctx.response.status: [ 200,201 ] then: - tag: add: [ "commit_message_allowed" ] - name: backup-flow-request-reshuffle filter: - flow: flows: - set-auth-for-backup-flow - rewrite_to_bulk: # 改写文档操作位 bulk 请求 type_removed: true # remove type `_doc` from the new versions of Elasticsearch - request_reshuffle: #reshuffle none-bulk requests elasticsearch: "backup" queue_name_prefix: "request_reshuffle" partition_size: $[[env.REQUEST_RESHUFFLE_PARTITION_SIZE]] tag_on_success: [ "commit_message_allowed" ] - bulk_reshuffle: #reshuffle bulk requests when: contains: _ctx.request.path: /_bulk elasticsearch: "backup" queue_name_prefix: "async_bulk" level: $[[env.BULK_RESHUFFLE_LEVEL]] #cluster,node,index,shard partition_size: $[[env.BULK_RESHUFFLE_PARTITION_SIZE]] fix_null_id: true index_stats_analysis: false action_stats_analysis: false tag_on_success: [ "commit_message_allowed" ] - name: backup-flow-replicate-processing filter: - if: not: cluster_available: [ "backup" ] then: - elasticsearch_health_check: elasticsearch: "backup" - sleep: sleep_in_million_seconds: 5000 - drop: - flow: flows: - set-auth-for-backup-flow - elasticsearch: elasticsearch: "backup" max_retry_times: 0 max_connection_per_node: 1000 refresh: enabled: true interval: 30s - bulk_response_process: # 如果部分请求出错,保存相关的消息到队列后,直接结束,不继续后续流程的处理 invalid_queue: "backup-invalid" tag_on_all_invalid: [ "commit_message_allowed" ] tag_on_all_success: [ "commit_message_allowed" ] continue_on_all_error: true #bulk 请求整体响应不是200,继续交由后面的 filter 进行处理 retry_rules: default: true retry_429: true retry_4xx: false denied: status: [ ] keyword: - "illegal_state_exception" when: contains: _ctx.request.path: /_bulk - if: #不合法的请求 and: - not: in: _ctx.response.status: [ 429 ] #400_500 之间但不包括 429 - range: _ctx.response.status.gte: 400 _ctx.response.status.lt: 500 then: - queue: queue_name: "backup-invalid" - tag: add: [ "commit_message_allowed" ] # 非法请求不处理了,commit 继续往后处理 - drop: - if: in: _ctx.response.status: [ 200,201 ] then: - tag: add: [ "commit_message_allowed" ] - name: tracing-flow # this flow is used for request tracing, refer to `router`'s `tracing_flow` filter: - queue: #handle dirty_writes, second-commit queue_name: "primary_final_commit_log##$[[partition_id]]" labels: type: "primary_final_commit_log" partition_id: "$[[partition_id]]" message: "$[[_ctx.request.header.X-Replicated-ID]]#$[[_ctx.request.header.LAST_PRODUCED_MESSAGE_OFFSET]]#$[[_sys.unix_timestamp_of_now]]" when: equals: _ctx.request.header.X-Replicated: "true" - context_filter: context: _ctx.request.path exclude: - /favicon.ico - logging: queue_name: request_logging max_request_body_size: 1024 max_response_body_size: 1024 when: or: - equals: _ctx.response.header.X-BulkRequest-Failed: "true" - not: in: _ctx.response.status: [ 200,201,404 ] - name: deny_flow # this flow is used for request logging, refer to `router`'s `tracing_flow` filter: - set_response: body: "request not allowed" status: 500 router: - name: my_router default_flow: primary-write-flow tracing_flow: tracing-flow rule_toggle_enabled: true rules: - method: - "GET" - "HEAD" pattern: - "/{any:*}" enabled: true flow: - primary-read-flow - method: - "*" enabled: true pattern: - "/_cat" - "/_sql" - "/_cluster" - "/_refresh" - "/_count" - "/_search" - "/_msearch" - "/_mget" - "/{any_index}/_eql/search" - "/{any_index}/_count" - "/{any_index}/_search" - "/{any_index}/_msearch" - "/{any_index}/_mget" flow: - primary-read-flow - method: - "*" enabled: $[[env.ENFORCE_BULK_REQUESTS_ONLY]] pattern: - "/_reindex" - "/_delete_by_query" - "/_update_by_query" - "/{any_index}/_reindex" - "/{any_index}/_delete_by_query" - "/{any_index}/_update_by_query" flow: - deny_flow - method: - "DELETE" enabled: $[[env.ENFORCE_BULK_REQUESTS_ONLY]] pattern: - "/{any_index}" - "/{any_index}/{any_type}" flow: - deny_flow elasticsearch: - name: primary enabled: true endpoints: $[[env.PRIMARY_ENDPOINTS]] basic_auth: username: $[[env.PRIMARY_USERNAME]] password: $[[env.PRIMARY_PASSWORD]] traffic_control: max_qps_per_node: $[[env.PRIMARY_MAX_QPS_PER_NODE]] max_bytes_per_node: $[[env.PRIMARY_MAX_BYTES_PER_NODE]] max_connection_per_node: $[[env.PRIMARY_MAX_CONNECTION_PER_NODE]] discovery: enabled: $[[env.PRIMARY_DISCOVERY_ENABLED]] refresh: enabled: $[[env.PRIMARY_DISCOVERY_REFRESH_ENABLED]] interval: 60s - name: backup enabled: true endpoints: $[[env.BACKUP_ENDPOINTS]] basic_auth: username: $[[env.BACKUP_USERNAME]] password: $[[env.BACKUP_PASSWORD]] traffic_control: max_qps_per_node: $[[env.BACKUP_MAX_QPS_PER_NODE]] max_bytes_per_node: $[[env.BACKUP_MAX_BYTES_PER_NODE]] max_connection_per_node: $[[env.BACKUP_MAX_CONNECTION_PER_NODE]] discovery: enabled: $[[env.BACKUP_DISCOVERY_ENABLED]] refresh: enabled: $[[env.BACKUP_DISCOVERY_REFRESH_ENABLED]] interval: 60s - name: logging enabled: true endpoints: $[[env.LOGGING_ENDPOINTS]] basic_auth: username: $[[env.LOGGING_USERNAME]] password: $[[env.LOGGING_PASSWORD]] traffic_control: max_qps_per_node: $[[env.LOGGING_MAX_QPS_PER_NODE]] max_bytes_per_node: $[[env.LOGGING_MAX_BYTES_PER_NODE]] max_connection_per_node: $[[env.LOGGING_MAX_CONNECTION_PER_NODE]] discovery: enabled: $[[env.LOGGING_DISCOVERY_ENABLED]] refresh: enabled: $[[env.LOGGING_DISCOVERY_REFRESH_ENABLED]] interval: 60s pipeline: - name: replication_correlation auto_start: true keep_running: true singleton: true processor: - replication_correlation: partition_size: $[[env.REPLICATION_PARTITION_SIZE]] elasticsearch: "logging" index_name: ".infini_replication_results" ## system logging and metrics - name: messages_merge_async_bulk_results auto_start: true keep_running: true singleton: true processor: - consumer: queue_selector: keys: - bulk_result_messages consumer: group: merge_to_bulk processor: - merge_to_bulk: elasticsearch: "logging" index_name: ".infini_async_bulk_results" output_queue: name: "merged_async_bulk_results" label: tag: "bulk_logging" worker_size: 1 bulk_size_in_mb: 10 - name: messages_merge_metrics auto_start: true keep_running: true singleton: true processor: - consumer: queue_selector: keys: - metrics consumer: group: merge_to_bulk processor: - merge_to_bulk: elasticsearch: "logging" index_name: ".infini_metrics" output_queue: name: "merged_metrics" label: tag: "metrics" worker_size: 1 bulk_size_in_mb: 10 - name: messages_merge_requests_logging auto_start: true keep_running: true singleton: true processor: - consumer: queue_selector: keys: - request_logging consumer: group: merge_to_bulk processor: - merge_to_bulk: elasticsearch: "logging" index_name: ".infini_requests_logging" output_queue: name: "merged_requests_logging" label: tag: "request_logging" worker_size: 1 bulk_size_in_mb: 10 - name: ingest_merged_requests auto_start: true keep_running: true processor: - bulk_indexing: # num_of_slices: 3 #runtime slicing idle_timeout_in_seconds: 30 bulk: compress: false batch_size_in_mb: 20 batch_size_in_docs: 50000 #remove_duplicated_newlines: true invalid_queue: "invalid_request" response_handle: bulk_result_message_queue: "system_failure_messages" max_request_body_size: 10240 max_response_body_size: 10240 save_success_results: false max_error_details_count: 5 consumer: client_expired_in_seconds: 30 fetch_max_messages: 5000 fetch_max_bytes: 10485760 fetch_max_wait_ms: 5000 queues: type: merge_to_bulk when: cluster_available: [ "logging" ] # pipelines for primary cluster - name: consume-queue_primary-dead_retry-to-primary auto_start: false keep_running: false singleton: true retry_delay_in_ms: 5000 processor: - flow_runner: input_queue: "primary-deadletter_requests" flow: primary-failure-primary-post-processing commit_on_tag: "commit_message_allowed" # skip_empty_queue: false when: cluster_available: [ "primary" ] - name: consume-queue_primary-failure-to-primary auto_start: true keep_running: true singleton: true retry_delay_in_ms: 0 processor: - flow_runner: input_queue: "primary-failure" flow: primary-failure-primary-post-processing commit_on_tag: "commit_message_allowed" consumer: group: primary name: primary-failure fetch_max_messages: 5000 fetch_max_bytes: 10485760 fetch_max_wait_ms: 5000 when: cluster_available: [ "primary" ] - name: backup-flow-request-reshuffle auto_start: true keep_running: true singleton: true retry_delay_in_ms: 10 processor: - consumer: max_worker_size: 100 queue_selector: labels: type: "primary_write_ahead_log" consumer: group: request-reshuffle fetch_max_messages: 10000 fetch_max_bytes: 20485760 fetch_max_wait_ms: 10000 processor: - flow_replay: flow: backup-flow-request-reshuffle commit_on_tag: "commit_message_allowed" - name: consume-queue_request-reshuffle auto_start: true keep_running: true singleton: true retry_delay_in_ms: 5000 processor: - consumer: max_worker_size: 100 queue_selector: labels: type: request_reshuffle consumer: group: flow_replay fetch_max_messages: 1000 fetch_max_bytes: 10485760 fetch_max_wait_ms: 1000 processor: - flow_replay: flow: backup-flow-replicate-processing commit_on_tag: "commit_message_allowed" - name: consume-queue_backup-bulk_request_ingestion-to-backup auto_start: true keep_running: true singleton: true processor: - bulk_indexing: bulk: #compress: true batch_size_in_mb: 20 batch_size_in_docs: 10000 output_bulk_stats: true response_handle: save_success_results: true save_error_results: true save_busy_results: true include_index_stats: true include_action_stats: true include_error_details: true bulk_result_message_queue: "bulk_result_messages" max_request_body_size: 10240 max_response_body_size: 10240 max_error_details_count: 5 consumer: fetch_max_messages: 5000 fetch_max_bytes: 10485760 #10MB fetch_max_wait_ms: 10000 client_expired_in_seconds: 30 max_worker_size: 200 # num_of_slices: 10 idle_timeout_in_seconds: 30 queues: type: bulk_reshuffle when: cluster_available: [ "backup" ] ## let's upload disk queue files to s3 for backup disk_queue: compress: segment: enabled: $[[env.DISK_QUEUE_COMPRESS_SEGMENT_ENABLED]] retention: max_num_of_local_files: $[[env.DISK_QUEUE_RETENTION_MAX_NUM_OF_LOCAL_FILES]] upload_to_s3: $[[env.DISK_QUEUE_UPLOAD_TO_S3]] s3: server: $[[env.S3_SERVER_NAME]] location: $[[env.S3_LOCATION]] bucket: $[[env.S3_BUCKET_NAME]] s3: $[[env.S3_SERVER_NAME]]: endpoint: $[[env.S3_HOST]] access_key: $[[env.S3_ACCESS_KEY]] access_secret: $[[env.S3_ACCESS_SECRET]] elastic: enabled: true remote_configs: false elasticsearch: logging #TODO: change to variable health_check: enabled: $[[env.ELASTIC_HEALTH_CHECK_ENABLED]] interval: 30s availability_check: enabled: $[[env.ELASTIC_AVAILABILITY_CHECK_ENABLED]] interval: 60s metadata_refresh: enabled: $[[env.ELASTIC_METADATA_REFRESH_ENABLED]] interval: 30s cluster_settings_check: enabled: $[[env.ELASTIC_CLUSTER_SETTINGS_CHECK_ENABLED]] interval: 20s discovery: enabled: true refresh: enabled: true interval: 30s ##metrics metrics: enabled: $[[env.METRICS_ENABLED]] queue: metrics instance: enabled: true network: enabled: true summary: true sockets: true floating_ip: enabled: $[[env.FLOATING_IP_ENABLED]] ip: $[[env.FLOATING_IP_ADDRESS]] netmask: $[[env.FLOATING_IP_MASK]] interface: $[[env.FLOATING_IP_INTERFACE]] priority: 100