/** * Copyright 2016 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ // Example function-based Apache Kafka producer package main import ( "fmt" "os" "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) func main() { if len(os.Args) != 3 { fmt.Fprintf(os.Stderr, "Usage: %s \n", os.Args[0]) os.Exit(1) } bootstrapServers := os.Args[1] topic := os.Args[2] totalMsgcnt := 3 p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers}) if err != nil { fmt.Printf("Failed to create producer: %s\n", err) os.Exit(1) } fmt.Printf("Created Producer %v\n", p) // Listen to all the events on the default events channel go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: // The message delivery report, indicating success or // permanent failure after retries have been exhausted. // Application level retries won't help since the client // is already configured to do that. m := ev if m.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error) } else { fmt.Printf("Delivered message to topic %s [%d] at offset %v\n", *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset) } case kafka.Error: // Generic client instance-level errors, such as // broker connection failures, authentication issues, etc. // // These errors should generally be considered informational // as the underlying client will automatically try to // recover from any errors encountered, the application // does not need to take action on them. fmt.Printf("Error: %v\n", ev) default: fmt.Printf("Ignored event: %s\n", ev) } } }() msgcnt := 0 for msgcnt < totalMsgcnt { value := fmt.Sprintf("Producer example, message #%d", msgcnt) err = p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(value), Headers: []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}}, }, nil) if err != nil { if err.(kafka.Error).Code() == kafka.ErrQueueFull { // Producer queue is full, wait 1s for messages // to be delivered then try again. time.Sleep(time.Second) continue } fmt.Printf("Failed to produce message: %v\n", err) } msgcnt++ } // Flush and close the producer and the events channel for p.Flush(10000) > 0 { fmt.Print("Still waiting to flush outstanding messages\n") } p.Close() }