/** * Copyright 2016 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ // Example function-based high-level Apache Kafka consumer package main // consumer_example implements a consumer using the non-channel Poll() API // to retrieve messages and events. import ( "fmt" "os" "os/signal" "syscall" "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) func main() { if len(os.Args) < 4 { fmt.Fprintf(os.Stderr, "Usage: %s \n", os.Args[0]) os.Exit(1) } bootstrapServers := os.Args[1] group := os.Args[2] topics := os.Args[3:] sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": bootstrapServers, // Avoid connecting to IPv6 brokers: // This is needed for the ErrAllBrokersDown show-case below // when using localhost brokers on OSX, since the OSX resolver // will return the IPv6 addresses first. // You typically don't need to specify this configuration property. "broker.address.family": "v4", "group.id": group, "session.timeout.ms": 6000, // Start reading from the first message of each assigned // partition if there are no previously committed offsets // for this group. "auto.offset.reset": "earliest", // Whether or not we store offsets automatically. "enable.auto.offset.store": false, }) if err != nil { fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err) os.Exit(1) } fmt.Printf("Created Consumer %v\n", c) err = c.SubscribeTopics(topics, nil) run := true for run { select { case sig := <-sigchan: fmt.Printf("Caught signal %v: terminating\n", sig) run = false default: ev := c.Poll(100) if ev == nil { continue } switch e := ev.(type) { case *kafka.Message: // Process the message received. fmt.Printf("%% Message on %s:\n%s\n", e.TopicPartition, string(e.Value)) if e.Headers != nil { fmt.Printf("%% Headers: %v\n", e.Headers) } // We can store the offsets of the messages manually or let // the library do it automatically based on the setting // enable.auto.offset.store. Once an offset is stored, the // library takes care of periodically committing it to the broker // if enable.auto.commit isn't set to false (the default is true). // By storing the offsets manually after completely processing // each message, we can ensure atleast once processing. _, err := c.StoreMessage(e) if err != nil { fmt.Fprintf(os.Stderr, "%% Error storing offset after message %s:\n", e.TopicPartition) } case kafka.Error: // Errors should generally be considered // informational, the client will try to // automatically recover. // But in this example we choose to terminate // the application if all brokers are down. fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) if e.Code() == kafka.ErrAllBrokersDown { run = false } default: fmt.Printf("Ignored %v\n", e) } } } fmt.Printf("Closing consumer\n") c.Close() }