# Copyright (c) 2014-2020 Snowplow Analytics Ltd. All rights reserved. # # This program is licensed to you under the Apache License Version 2.0, and # you may not use this file except in compliance with the Apache License # Version 2.0. You may obtain a copy of the Apache License Version 2.0 at # http://www.apache.org/licenses/LICENSE-2.0. # # Unless required by applicable law or agreed to in writing, software # distributed under the Apache License Version 2.0 is distributed on an "AS # IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. See the Apache License Version 2.0 for the specific language # governing permissions and limitations there under. # This file (config.hocon.sample) contains a template with # configuration options for the Elasticsearch Loader. # Sources currently supported are: # "kinesis" for reading records from a Kinesis stream # "stdin" for reading unencoded tab-separated events from stdin # If set to "stdin", JSON documents will not be sent to Elasticsearch # but will be written to stdout. # "nsq" for reading unencoded tab-separated events from NSQ source = {{elasticsearchInputType}} # Where to write good and bad records sink { # Sinks currently supported are: # "elasticsearch" for writing good records to Elasticsearch # "stdout" for writing good records to stdout good = "{{elasticsearchGoodOutputDestination}}" # Sinks currently supported are: # "kinesis" for writing bad records to Kinesis # "stderr" for writing bad records to stderr # "nsq" for writing bad records to NSQ # "none" for ignoring bad records bad = "{{elasticsearchBadOutputDestination}}" } # "good" for a stream of successfully enriched events # "bad" for a stream of bad events # "plain-json" for writing plain json enabled = "{{kinesisInEnabledStreamType}}" # The following are used to authenticate for the Amazon Kinesis sink. # # If both are set to "default", the default provider chain is used # (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html) # # If both are set to "iam", use AWS IAM Roles to provision credentials. # # If both are set to "env", use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY aws { accessKey = iam secretKey = iam } queue { # What queue to use, can be "kinesis" or "nsq" enabled="kinesis" # Config for Kinesis # "LATEST": most recent data. # "TRIM_HORIZON": oldest available data. # "AT_TIMESTAMP": Start from the record at or after the specified timestamp # Note: This only affects the first run of this application on a stream. initialPosition = "{{initialPosition}}" # Need to be specified when initial-position is "AT_TIMESTAMP". # Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ". # Ex: "2017-05-17T10:00:00Z" # Note: Time need to specified in UTC. initialTimestamp = "{{initialTimestamp}}" # Maximum number of records to get from Kinesis per call to GetRecords maxRecords = {{kinesisMaxRecords}} # Region where the Kinesis stream is located region = "{{kinesisRegion}}" # Optional endpoint url configuration to override aws kinesis endpoints, # this can be used to specify local endpoints when using localstack # customEndpoint = {{kinesisEndpoint}} # customEndpoint = ${?ENRICH_STREAMS_SOURCE_SINK_CUSTOM_ENDPOINT} # Optional endpoint url configuration to override aws dyanomdb endpoints for Kinesis checkpoints lease table, # this can be used to specify local endpoints when using Localstack # dynamodbCustomEndpoint = "http://localhost:4569" # dynamodbCustomEndpoint = ${?ENRICH_DYNAMODB_CUSTOM_ENDPOINT} # Optional override to disable cloudwatch # disableCloudWatch = true # disableCloudWatch = ${?ENRICH_DISABLE_CLOUDWATCH} # "appName" is used for a DynamoDB table to maintain stream state. # You can set it automatically using: "SnowplowElasticsearchSink-${sink.kinesis.in.stream-name}" appName = "{{kinesisAppName}}" # Config for NSQ # Channel name for NSQ source # If more than one application reading from the same NSQ topic at the same time, # all of them must have unique channel name for getting all the data from the same topic #channelName = "{{nsqSourceChannelName}}" # Host name for nsqd #nsqdHost = "{{nsqdHost}}" # HTTP port for nsqd #nsqdPort = {{nsqdPort}} # Host name for nsqlookupd #nsqlookupdHost = "{{nsqlookupdHost}}" # HTTP port for nsqd #nsqlookupdPort = {{nsqlookupdPort}} } # Common configuration section for all stream sources streams { inStreamName = "{{inStreamName}}" # Stream for enriched events which are rejected by Elasticsearch outStreamName = "{{outStreamName}}" # Events are accumulated in a buffer before being sent to Elasticsearch. # The buffer is emptied whenever: # - the combined size of the stored records exceeds byteLimit or # - the number of stored records exceeds recordLimit or # - the time in milliseconds since it was last emptied exceeds timeLimit buffer { byteLimit = {{elasticsearchBufferByteThreshold}} # Not supported by NSQ, will be ignored recordLimit = {{elasticsearchBufferRecordThreshold}} timeLimit = {{elasticsearchBufferTimeThreshold}} # Not supported by NSQ, will be ignored } } elasticsearch { # Events are indexed using an Elasticsearch Client # - endpoint: the cluster endpoint # - port: the port the cluster can be accessed on # - for http this is usually 9200 # - for transport this is usually 9300 # - username (optional, remove if not active): http basic auth username # - password (optional, remove if not active): http basic auth password # - shardDateFormat (optional, remove if not needed): formatting used for sharding good stream, i.e. _yyyy-MM-dd # - shardDateField (optional, if not specified derived_tstamp is used): timestamp field for sharding good stream # - max-timeout: the maximum attempt time before a client restart # - ssl: if using the http client, whether to use ssl or not client { endpoint = "{{elasticsearchEndpoint}}" port = "{{elasticsearchTransportPort}}" username = "{{elasticsearchUsername}}" password = "{{elasticsearchPassword}}" shardDateFormat = "{{elasticsearchShardDateFormat}}" shardDateField = "{{elasticsearchShardDateField}}" maxTimeout = "{{elasticsearchMaxTimeout}}" maxRetries = {{elasticsearchMaxRetries}} ssl = {{sslBool}} } # When using the AWS ES service # - signing: if using the http client and the AWS ES service you can sign your requests # http://docs.aws.amazon.com/general/latest/gr/signing_aws_api_requests.html # - region where the AWS ES service is located aws { signing = {{signingBool}} region = "{{elasticsearchRegion}}" } # index: the Elasticsearch index name # type: the Elasticsearch index type cluster { name = "{{elasticsearchClusterName}}" index = "{{elasticsearchIndex}}" documentType = "{{elasticsearchType}}" } } # Optional section for tracking endpoints monitoring { snowplow { collectorUri = "{{collectorUri}}" collectorPort = "{{collectorPort}}" ssl = "{{sslMonitoring}}" appId = "{{appId}}" method = "{{method}}" } }