Outputs

This document was generated with benthos --list-outputs

An output is a sink where we wish to send our consumed data after applying an optional array of processors. Only one output is configured at the root of a Benthos config. However, the output can be a broker which combines multiple outputs under a chosen brokering pattern.

An output config section looks like this:

output:
  s3:
    bucket: TODO
    path: "${!metadata:kafka_topic}/${!json_field:message.id}.json"

  # Optional list of processing steps
  processors:
   - jmespath:
       query: '{ message: @, meta: { link_count: length(links) } }'

Back Pressure

Benthos outputs apply back pressure to components upstream. This means if your output target starts blocking traffic Benthos will gracefully stop consuming until the issue is resolved.

Retries

When a Benthos output fails to send a message the error is propagated back up to the input, where depending on the protocol it will either be pushed back to the source as a Noack (e.g. AMQP) or will be reattempted indefinitely with the commit withheld until success (e.g. Kafka).

It's possible to instead have Benthos indefinitely retry an output until success with a retry output. Some other outputs, such as the broker, might also retry indefinitely depending on their configuration.

Multiplexing Outputs

It is possible to perform content based multiplexing of messages to specific outputs either by using the switch output, or a broker with the fan_out pattern and a filter processor on each output, which is a processor that drops messages if the condition does not pass. Conditions are content aware logical operators that can be combined using boolean logic.

For more information regarding conditions, including a full list of available conditions please read the docs here.

Dead Letter Queues

It's possible to create fallback outputs for when an output target fails using a try output.

Contents

  1. amqp_0_9
  2. broker
  3. cache
  4. drop
  5. drop_on_error
  6. dynamic
  7. dynamodb
  8. elasticsearch
  9. file
  10. files
  11. gcp_pubsub
  12. hdfs
  13. http_client
  14. http_server
  15. inproc
  16. kafka
  17. kinesis
  18. kinesis_firehose
  19. mqtt
  20. nanomsg
  21. nats
  22. nats_stream
  23. nsq
  24. redis_hash
  25. redis_list
  26. redis_pubsub
  27. redis_streams
  28. retry
  29. s3
  30. sns
  31. socket
  32. sqs
  33. stdout
  34. switch
  35. sync_response
  36. try
  37. websocket

amqp_0_9

type: amqp_0_9
amqp_0_9:
  exchange: benthos-exchange
  exchange_declare:
    durable: true
    enabled: false
    type: direct
  immediate: false
  key: benthos-key
  mandatory: false
  max_in_flight: 1
  persistent: false
  tls:
    client_certs: []
    enabled: false
    root_cas_file: ""
    skip_cert_verify: false
  url: amqp://guest:guest@localhost:5672/

Sends messages to an AMQP (0.91) exchange. AMQP is a messaging protocol used by various message brokers, including RabbitMQ. The metadata from each message are delivered as headers.

It's possible for this output type to create the target exchange by setting exchange_declare.enabled to true, if the exchange already exists then the declaration passively verifies that the settings match.

Exchange type options are: direct|fanout|topic|x-custom

TLS is automatic when connecting to an amqps URL, but custom settings can be enabled in the tls section.

The field 'key' can be dynamically set using function interpolations described here.

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.


broker

type: broker
broker:
  batching:
    byte_size: 0
    condition:
      type: static
      static: false
    count: 1
    period: ""
  copies: 1
  outputs: []
  pattern: fan_out

The broker output type allows you to configure multiple output targets by listing them:

output:
  broker:
    pattern: fan_out
    outputs:
    - foo:
        foo_field_1: value1
    - bar:
        bar_field_1: value2
        bar_field_2: value3
    - baz:
        baz_field_1: value4
      # Processors only applied to messages sent to baz.
      processors:
      - type: baz_processor

  # Processors applied to messages sent to all brokered outputs.
  processors:
  - type: some_processor

The broker pattern determines the way in which messages are allocated to outputs and can be chosen from the following:

fan_out

With the fan out pattern all outputs will be sent every message that passes through Benthos in parallel.

If an output applies back pressure it will block all subsequent messages, and if an output fails to send a message it will be retried continuously until completion or service shut down.

fan_out_sequential

Similar to the fan out pattern except outputs are written to sequentially, meaning an output is only written to once the preceding output has confirmed receipt of the same message.

round_robin

With the round robin pattern each message will be assigned a single output following their order. If an output applies back pressure it will block all subsequent messages. If an output fails to send a message then the message will be re-attempted with the next input, and so on.

greedy

The greedy pattern results in higher output throughput at the cost of potentially disproportionate message allocations to those outputs. Each message is sent to a single output, which is determined by allowing outputs to claim messages as soon as they are able to process them. This results in certain faster outputs potentially processing more messages at the cost of slower outputs.

try

The try pattern attempts to send each message to only one output, starting from the first output on the list. If an output attempt fails then the broker attempts to send to the next output in the list and so on.

This pattern is useful for triggering events in the case where certain output targets have broken. For example, if you had an output type http_client but wished to reroute messages whenever the endpoint becomes unreachable you could use a try broker.

Batching

It's possible to configure a batch policy with a broker using the batching fields, allowing you to create batches after your processing stages. Some inputs do not support broker based batching and specify this in their documentation.

Utilising More Outputs

When using brokered outputs with patterns such as round robin or greedy it is possible to have multiple messages in-flight at the same time. In order to fully utilise this you either need to have a greater number of input sources than output sources or use a buffer.

Processors

It is possible to configure processors at the broker level, where they will be applied to all child outputs, as well as on the individual child outputs. If you have processors at both the broker level and on child outputs then the broker processors will be applied before the child nodes processors.


cache

type: cache
cache:
  key: ${!count:items}-${!timestamp_unix_nano}
  max_in_flight: 1
  target: ""

Stores messages in a cache. Caches are configured within the resources section and can target any of the following types:

Like follows:

output:
  cache:
    target: foo
    key: ${!json_field:document.id}

resources:
  caches:
    foo:
      memcached:
        addresses:
          - localhost:11211
        ttl: 60

In order to create a unique key value per item you should use function interpolations described here. When sending batched messages the interpolations are performed per message part.

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.


drop

type: drop
drop: {}

Drops all messages.


drop_on_error

type: drop_on_error
drop_on_error: {}

Attempts to write messages to a child output and if the write fails for any reason the message is dropped instead of being reattempted. This output can be combined with a child retry output in order to set an explicit number of retry attempts before dropping a message.

For example, the following configuration attempts to send to a hypothetical output type foo three times, but if all three attempts fail the message is dropped entirely:

output:
  drop_on_error:
    retry:
      max_retries: 2
      output:
        type: foo

dynamic

type: dynamic
dynamic:
  outputs: {}
  prefix: ""
  timeout: 5s

The dynamic type is a special broker type where the outputs are identified by unique labels and can be created, changed and removed during runtime via a REST HTTP interface. The broker pattern used is always fan_out, meaning each message will be delivered to each dynamic output.

To GET a JSON map of output identifiers with their current uptimes use the '/outputs' endpoint.

To perform CRUD actions on the outputs themselves use POST, DELETE, and GET methods on the /outputs/{output_id} endpoint. When using POST the body of the request should be a JSON configuration for the output, if the output already exists it will be changed.


dynamodb

type: dynamodb
dynamodb:
  backoff:
    initial_interval: 1s
    max_elapsed_time: 30s
    max_interval: 5s
  batching:
    byte_size: 0
    condition:
      type: static
      static: false
    count: 1
    period: ""
  credentials:
    id: ""
    profile: ""
    role: ""
    role_external_id: ""
    secret: ""
    token: ""
  endpoint: ""
  json_map_columns: {}
  max_in_flight: 1
  max_retries: 3
  region: eu-west-1
  string_columns: {}
  table: ""
  ttl: ""
  ttl_key: ""

Inserts items into a DynamoDB table.

The field string_columns is a map of column names to string values, where the values are function interpolated per message of a batch. This allows you to populate string columns of an item by extracting fields within the document payload or metadata like follows:

string_columns:
  id: ${!json_field:id}
  title: ${!json_field:body.title}
  topic: ${!metadata:kafka_topic}
  full_content: ${!content}

The field json_map_columns is a map of column names to json paths, where the dot path is extracted from each document and converted into a map value. Both an empty path and the path . are interpreted as the root of the document. This allows you to populate map columns of an item like follows:

json_map_columns:
  user: path.to.user
  whole_document: .

A column name can be empty:

json_map_columns:
  "": .

In which case the top level document fields will be written at the root of the item, potentially overwriting previously defined column values. If a path is not found within a document the column will not be populated.

Credentials

By default Benthos will use a shared credentials file when connecting to AWS services. It's also possible to set them explicitly at the component level, allowing you to transfer data across accounts. You can find out more in this document.

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.

This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc.


elasticsearch

type: elasticsearch
elasticsearch:
  aws:
    credentials:
      id: ""
      profile: ""
      role: ""
      role_external_id: ""
      secret: ""
      token: ""
    enabled: false
    endpoint: ""
    region: eu-west-1
  backoff:
    initial_interval: 1s
    max_elapsed_time: 30s
    max_interval: 5s
  basic_auth:
    enabled: false
    password: ""
    username: ""
  batching:
    byte_size: 0
    condition:
      type: static
      static: false
    count: 1
    period: ""
  healthcheck: true
  id: ${!count:elastic_ids}-${!timestamp_unix}
  index: benthos_index
  max_in_flight: 1
  max_retries: 0
  pipeline: ""
  sniff: true
  timeout: 5s
  type: doc
  urls:
  - http://localhost:9200

Publishes messages into an Elasticsearch index. If the index does not exist then it is created with a dynamic mapping.

Both the id and index fields can be dynamically set using function interpolations described here. When sending batched messages these interpolations are performed per message part.

AWS Credentials

By default Benthos will use a shared credentials file when connecting to AWS services. It's also possible to set them explicitly at the component level, allowing you to transfer data across accounts. You can find out more in this document.

If the configured target is a managed AWS Elasticsearch cluster, you may need to set sniff and healthcheck to false for connections to succeed.

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.

This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc.


file

type: file
file:
  delimiter: ""
  path: ""

The file output type simply appends all messages to an output file. Single part messages are printed with a delimiter (defaults to '\n' if left empty). Multipart messages are written with each part delimited, with the final part followed by two delimiters, e.g. a multipart message [ "foo", "bar", "baz" ] would be written as:

foo\n bar\n baz\n\n


files

type: files
files:
  path: ${!count:files}-${!timestamp_unix_nano}.txt

Writes each individual part of each message to a new file.

Message parts only contain raw data, and therefore in order to create a unique file for each part you need to generate unique file names. This can be done by using function interpolations on the path field as described here. When sending batched messages these interpolations are performed per message part.


gcp_pubsub

type: gcp_pubsub
gcp_pubsub:
  max_in_flight: 1
  project: ""
  topic: ""

Sends messages to a GCP Cloud Pub/Sub topic. Metadata from messages are sent as attributes.

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.


hdfs

type: hdfs
hdfs:
  directory: ""
  hosts:
  - localhost:9000
  max_in_flight: 1
  path: ${!count:files}-${!timestamp_unix_nano}.txt
  user: benthos_hdfs

Sends message parts as files to a HDFS directory. Each file is written with the path specified with the 'path' field, in order to have a different path for each object you should use function interpolations described here. When sending batched messages the interpolations are performed per message part.

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.


http_client

type: http_client
http_client:
  backoff_on:
  - 429
  basic_auth:
    enabled: false
    password: ""
    username: ""
  batching:
    byte_size: 0
    condition:
      type: static
      static: false
    count: 1
    period: ""
  copy_response_headers: false
  drop_on: []
  headers:
    Content-Type: application/octet-stream
  max_in_flight: 1
  max_retry_backoff: 300s
  oauth:
    access_token: ""
    access_token_secret: ""
    consumer_key: ""
    consumer_secret: ""
    enabled: false
    request_url: ""
  propagate_response: false
  rate_limit: ""
  retries: 3
  retry_period: 1s
  successful_on: []
  timeout: 5s
  tls:
    client_certs: []
    enabled: false
    root_cas_file: ""
    skip_cert_verify: false
  url: http://localhost:4195/post
  verb: POST

Sends messages to an HTTP server. The request will be retried for each message whenever the response code is outside the range of 200 -> 299 inclusive. It is possible to list codes outside of this range in the drop_on field in order to prevent retry attempts.

The period of time between retries is linear by default. Response codes that are within the backoff_on list will instead apply exponential backoff between retry attempts.

When the number of retries expires the output will reject the message, the behaviour after this will depend on the pipeline but usually this simply means the send is attempted again until successful whilst applying back pressure.

The URL and header values of this type can be dynamically set using function interpolations described here.

The body of the HTTP request is the raw contents of the message payload. If the message has multiple parts the request will be sent according to RFC1341

Propagating Responses

It's possible to propagate the response from each HTTP request back to the input source by setting propagate_response to true. Only inputs that support synchronous responses are able to make use of these propagated responses.

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.

This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc.


http_server

type: http_server
http_server:
  address: ""
  cert_file: ""
  key_file: ""
  path: /get
  stream_path: /get/stream
  timeout: 5s
  ws_path: /get/ws

Sets up an HTTP server that will send messages over HTTP(S) GET requests. HTTP 2.0 is supported when using TLS, which is enabled when key and cert files are specified.

You can leave the address config field blank in order to use the default service wide server address, but this will ignore TLS options.

Three endpoints will be registered at the paths specified by the fields path, stream_path and ws_path. Which allow you to consume a single message batch, a continuous stream of line delimited messages, or a websocket of messages for each request respectively.

When messages are batched the path endpoint encodes the batch according to RFC1341.


inproc

type: inproc
inproc: ""

Sends data directly to Benthos inputs by connecting to a unique ID. This allows you to hook up isolated streams whilst running Benthos in --streams mode mode, it is NOT recommended that you connect the inputs of a stream with an output of the same stream, as feedback loops can lead to deadlocks in your message flow.

It is possible to connect multiple inputs to the same inproc ID, but only one output can connect to an inproc ID, and will replace existing outputs if a collision occurs.


kafka

type: kafka
kafka:
  ack_replicas: false
  addresses:
  - localhost:9092
  backoff:
    initial_interval: 3s
    max_elapsed_time: 30s
    max_interval: 10s
  batching:
    byte_size: 0
    condition:
      type: static
      static: false
    count: 1
    period: ""
  client_id: benthos_kafka_output
  compression: none
  key: ""
  max_in_flight: 1
  max_msg_bytes: 1000000
  max_retries: 0
  partitioner: fnv1a_hash
  sasl:
    enabled: false
    password: ""
    user: ""
  target_version: 1.0.0
  timeout: 5s
  tls:
    client_certs: []
    enabled: false
    root_cas_file: ""
    skip_cert_verify: false
  topic: benthos_stream

The kafka output type writes a batch of messages to Kafka brokers and waits for acknowledgement before propagating it back to the input. The config field ack_replicas determines whether we wait for acknowledgement from all replicas or just a single broker.

It is possible to specify a compression codec to use out of the following options: none, snappy, lz4 and gzip.

Both the key and topic fields can be dynamically set using function interpolations described here. When sending batched messages these interpolations are performed per message part.

The partitioner field determines how messages are delegated to a partition. Options are fnv1a_hash, murmur2_hash, random and round_robin. When a hash partitioner is selected but a message key is empty then a random partition is chosen.

TLS

Custom TLS settings can be used to override system defaults. This includes providing a collection of root certificate authorities, providing a list of client certificates to use for client verification and skipping certificate verification.

Client certificates can either be added by file or by raw contents:

enabled: true
client_certs:
  - cert_file: ./example.pem
    key_file: ./example.key
  - cert: foo
    key: bar

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.

This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc.


kinesis

type: kinesis
kinesis:
  backoff:
    initial_interval: 1s
    max_elapsed_time: 30s
    max_interval: 5s
  batching:
    byte_size: 0
    condition:
      type: static
      static: false
    count: 1
    period: ""
  credentials:
    id: ""
    profile: ""
    role: ""
    role_external_id: ""
    secret: ""
    token: ""
  endpoint: ""
  hash_key: ""
  max_in_flight: 1
  max_retries: 0
  partition_key: ""
  region: eu-west-1
  stream: ""

Sends messages to a Kinesis stream.

Both the partition_key(required) and hash_key (optional) fields can be dynamically set using function interpolations described here. When sending batched messages the interpolations are performed per message part.

Credentials

By default Benthos will use a shared credentials file when connecting to AWS services. It's also possible to set them explicitly at the component level, allowing you to transfer data across accounts. You can find out more in this document.

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.

This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc.


kinesis_firehose

type: kinesis_firehose
kinesis_firehose:
  backoff:
    initial_interval: 1s
    max_elapsed_time: 30s
    max_interval: 5s
  batching:
    byte_size: 0
    condition:
      type: static
      static: false
    count: 1
    period: ""
  credentials:
    id: ""
    profile: ""
    role: ""
    role_external_id: ""
    secret: ""
    token: ""
  endpoint: ""
  max_in_flight: 1
  max_retries: 0
  region: eu-west-1
  stream: ""

Sends messages to a Kinesis Firehose delivery stream.

Credentials

By default Benthos will use a shared credentials file when connecting to AWS services. It's also possible to set them explicitly at the component level, allowing you to transfer data across accounts. You can find out more in this document.

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.

This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc.


mqtt

type: mqtt
mqtt:
  client_id: benthos_output
  max_in_flight: 1
  password: ""
  qos: 1
  topic: benthos_topic
  urls:
  - tcp://localhost:1883
  user: ""

Pushes messages to an MQTT broker.

The topic field can be dynamically set using function interpolations described here. When sending batched messages these interpolations are performed per message part.

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.


nanomsg

type: nanomsg
nanomsg:
  bind: false
  max_in_flight: 1
  poll_timeout: 5s
  socket_type: PUSH
  urls:
  - tcp://localhost:5556

The scalability protocols are common communication patterns. This output should be compatible with any implementation, but specifically targets Nanomsg.

Currently only PUSH and PUB sockets are supported.

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.


nats

type: nats
nats:
  max_in_flight: 1
  subject: benthos_messages
  urls:
  - nats://127.0.0.1:4222

Publish to an NATS subject. NATS is at-most-once, so delivery is not guaranteed. For at-least-once behaviour with NATS look at NATS Stream.

This output will interpolate functions within the subject field, you can find a list of functions here.

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.


nats_stream

type: nats_stream
nats_stream:
  client_id: benthos_client
  cluster_id: test-cluster
  max_in_flight: 1
  subject: benthos_messages
  urls:
  - nats://127.0.0.1:4222

Publish to a NATS Stream subject.

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.


nsq

type: nsq
nsq:
  max_in_flight: 1
  nsqd_tcp_address: localhost:4150
  topic: benthos_messages
  user_agent: benthos_producer

Publish to an NSQ topic. The topic field can be dynamically set using function interpolations described here. When sending batched messages these interpolations are performed per message part.

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.


redis_hash

type: redis_hash
redis_hash:
  fields: {}
  key: ""
  max_in_flight: 1
  url: tcp://localhost:6379
  walk_json_object: false
  walk_metadata: false

Sets Redis hash objects using the HMSET command.

The field key supports interpolation functions evaluated per message of a batch, allowing you to create a unique key for each message.

The field fields allows you to specify an explicit map of field names to interpolated values, also evaluated per message of a batch:

redis_hash:
  url: tcp://localhost:6379
  key: ${!json_field:id}
  fields:
    topic: ${!metadata:kafka_topic}
    partition: ${!metadata:kafka_partition}
    content: ${!json_field:document.text}

If the field walk_metadata is set to true then Benthos will walk all metadata fields of messages and add them to the list of hash fields to set.

If the field walk_json_object is set to true then Benthos will walk each message as a JSON object, extracting keys and the string representation of their value and adds them to the list of hash fields to set.

The order of hash field extraction is as follows:

  1. Metadata (if enabled)
  2. JSON object (if enabled)
  3. Explicit fields

Where latter stages will overwrite matching field names of a former stage.

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.


redis_list

type: redis_list
redis_list:
  key: benthos_list
  max_in_flight: 1
  url: tcp://localhost:6379

Pushes messages onto the end of a Redis list (which is created if it doesn't already exist) using the RPUSH command.

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.


redis_pubsub

type: redis_pubsub
redis_pubsub:
  channel: benthos_chan
  max_in_flight: 1
  url: tcp://localhost:6379

Publishes messages through the Redis PubSub model. It is not possible to guarantee that messages have been received.

This output will interpolate functions within the channel field, you can find a list of functions here.

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.


redis_streams

type: redis_streams
redis_streams:
  body_key: body
  max_in_flight: 1
  max_length: 0
  stream: benthos_stream
  url: tcp://localhost:6379

Pushes messages to a Redis (v5.0+) Stream (which is created if it doesn't already exist) using the XADD command. It's possible to specify a maximum length of the target stream by setting it to a value greater than 0, in which case this cap is applied only when Redis is able to remove a whole macro node, for efficiency.

Redis stream entries are key/value pairs, as such it is necessary to specify the key to be set to the body of the message. All metadata fields of the message will also be set as key/value pairs, if there is a key collision between a metadata item and the body then the body takes precedence.

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.


retry

type: retry
retry:
  backoff:
    initial_interval: 500ms
    max_elapsed_time: 0s
    max_interval: 3s
  max_retries: 0
  output: {}

Attempts to write messages to a child output and if the write fails for any reason the message is retried either until success or, if the retries or max elapsed time fields are non-zero, either is reached.

All messages in Benthos are always retried on an output error, but this would usually involve propagating the error back to the source of the message, whereby it would be reprocessed before reaching the output layer once again.

This output type is useful whenever we wish to avoid reprocessing a message on the event of a failed send. We might, for example, have a dedupe processor that we want to avoid reapplying to the same message more than once in the pipeline.

Rather than retrying the same output you may wish to retry the send using a different output target (a dead letter queue). In which case you should instead use the try output type.


s3

type: s3
s3:
  bucket: ""
  content_encoding: ""
  content_type: application/octet-stream
  credentials:
    id: ""
    profile: ""
    role: ""
    role_external_id: ""
    secret: ""
    token: ""
  endpoint: ""
  force_path_style_urls: false
  kms_key_id: ""
  max_in_flight: 1
  path: ${!count:files}-${!timestamp_unix_nano}.txt
  region: eu-west-1
  storage_class: STANDARD
  timeout: 5s

Sends message parts as objects to an Amazon S3 bucket. Each object is uploaded with the path specified with the path field.

In order to have a different path for each object you should use function interpolations described here, which are calculated per message of a batch.

The fields content_type, content_encoding and storage_class can also be set dynamically using function interpolation.

Credentials

By default Benthos will use a shared credentials file when connecting to AWS services. It's also possible to set them explicitly at the component level, allowing you to transfer data across accounts. You can find out more in this document.

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.


sns

type: sns
sns:
  credentials:
    id: ""
    profile: ""
    role: ""
    role_external_id: ""
    secret: ""
    token: ""
  endpoint: ""
  max_in_flight: 1
  region: eu-west-1
  timeout: 5s
  topic_arn: ""

Sends messages to an AWS SNS topic.

Credentials

By default Benthos will use a shared credentials file when connecting to AWS services. It's also possible to set them explicitly at the component level, allowing you to transfer data across accounts. You can find out more in this document.

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.


socket

type: socket
socket:
  address: /tmp/benthos.sock
  network: unix

Sends messages as a continuous stream of line delimited data over a (tcp/udp/unix) socket by connecting to a server.

If batched messages are sent the final message of the batch will be followed by two line breaks in order to indicate the end of the batch.


sqs

type: sqs
sqs:
  backoff:
    initial_interval: 1s
    max_elapsed_time: 30s
    max_interval: 5s
  batching:
    byte_size: 0
    condition:
      type: static
      static: false
    count: 1
    period: ""
  credentials:
    id: ""
    profile: ""
    role: ""
    role_external_id: ""
    secret: ""
    token: ""
  endpoint: ""
  max_in_flight: 1
  max_retries: 0
  message_deduplication_id: ""
  message_group_id: ""
  region: eu-west-1
  url: ""

Sends messages to an SQS queue. Metadata values are sent along with the payload as attributes with the data type String. If the number of metadata values in a message exceeds the message attribute limit (10) then the top ten keys ordered alphabetically will be selected.

The fields message_group_id and message_deduplication_id can be set dynamically using function interpolations, which are resolved individually for each message of a batch.

Credentials

By default Benthos will use a shared credentials file when connecting to AWS services. It's also possible to set them explicitly at the component level, allowing you to transfer data across accounts. You can find out more in this document.

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.

This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc.


stdout

type: stdout
stdout:
  delimiter: ""

The stdout output type prints messages to stdout. Single part messages are printed with a delimiter (defaults to '\n' if left empty). Multipart messages are written with each part delimited, with the final part followed by two delimiters, e.g. a multipart message [ "foo", "bar", "baz" ] would be written as:

foo\n bar\n baz\n\n


switch

type: switch
switch:
  outputs: []
  retry_until_success: true

The switch output type allows you to configure multiple conditional output targets by listing child outputs paired with conditions. Conditional logic is currently applied per whole message batch. In order to multiplex per message of a batch use the broker output with the pattern fan_out.

In the following example, messages containing "foo" will be sent to both the foo and baz outputs. Messages containing "bar" will be sent to both the bar and baz outputs. Messages containing both "foo" and "bar" will be sent to all three outputs. And finally, messages that do not contain "foo" or "bar" will be sent to the baz output only.

output:
  switch:
    retry_until_success: true
    outputs:
    - output:
        foo:
          foo_field_1: value1
      condition:
        text:
          operator: contains
          arg: foo
      fallthrough: true
    - output:
        bar:
          bar_field_1: value2
          bar_field_2: value3
      condition:
        text:
          operator: contains
          arg: bar
      fallthrough: true
    - output:
        baz:
          baz_field_1: value4
        processors:
        - type: baz_processor
  processors:
  - type: some_processor

The switch output requires a minimum of two outputs. If no condition is defined for an output, it behaves like a static true condition. If fallthrough is set to true, the switch output will continue evaluating additional outputs after finding a match.

Messages that do not match any outputs will be dropped. If an output applies back pressure it will block all subsequent messages.

If an output fails to send a message it will be retried continuously until completion or service shut down. You can change this behaviour so that when an output returns an error the switch output also returns an error by setting retry_until_success to false. This allows you to wrap the switch with a try broker, but care must be taken to ensure duplicate messages aren't introduced during error conditions.


sync_response

type: sync_response
sync_response: {}

Returns the final message payload back to the input origin of the message, where it is dealt with according to that specific input type.

For most inputs this mechanism is ignored entirely, in which case the sync response is dropped without penalty. It is therefore safe to use this output even when combining input types that might not have support for sync responses. An example of an input able to utilise this is the http_server.

It is safe to combine this output with others using broker types. For example, with the http_server input we could send the payload to a Kafka topic and also send a modified payload back with:

input:
  http_server:
    path: /post
output:
  broker:
    pattern: fan_out
    outputs:
    - kafka:
        addresses: [ TODO:9092 ]
        topic: foo_topic
    - type: sync_response
      processors:
      - text:
          operator: to_upper

Using the above example and posting the message 'hello world' to the endpoint /post Benthos would send it unchanged to the topic foo_topic and also respond with 'HELLO WORLD'.

For more information please read Synchronous Responses.


try

type: try
try: []

Attempts to send each message to only one output, starting from the first output on the list. If an output attempt fails then the next output in the list is attempted, and so on.

This pattern is useful for triggering events in the case where certain output targets have broken. For example, if you had an output type http_client but wished to reroute messages whenever the endpoint becomes unreachable you could use this pattern:

output:
  try:
  - http_client:
      url: http://foo:4195/post/might/become/unreachable
      retries: 3
      retry_period: 1s
  - http_client:
      url: http://bar:4196/somewhere/else
      retries: 3
      retry_period: 1s
    processors:
    - text:
        operator: prepend
        value: 'failed to send this message to foo: '
  - file:
      path: /usr/local/benthos/everything_failed.jsonl

websocket

type: websocket
websocket:
  basic_auth:
    enabled: false
    password: ""
    username: ""
  oauth:
    access_token: ""
    access_token_secret: ""
    consumer_key: ""
    consumer_secret: ""
    enabled: false
    request_url: ""
  url: ws://localhost:4195/post/ws

Sends messages to an HTTP server via a websocket connection.