本文翻譯自 NY Times,原作者為 JP Robinson:https://open.blogs.nytimes.com/2014/07/10/emr-streaming-in-go/?_php=true&_type=blogs&_php=true&_type=blogs&_r=1
我們的平台團隊使用 Amazon 的 Elastic MapReduce (EMR) 服務,協助我們從日誌檔收集有用的數據。我們有一些程序會抓取日誌檔,接著壓縮並推送到 Amazon S3 上儲存。感謝 EMR,這套模式聚集出回溯數年之久的龐大資訊,等待我們進行資料重整 (data crunching)。一開始我們用 Python 做了不少苦功,不過我們漸漸地轉而依賴 Go。
當我們一開始使用 EMR,我的團隊用 Python 來寫 mapper 跟 reducer 腳本。之所以選擇 Python,是因為只要最簡單的設定與程式碼,就可以寫出一支能夠從標準輸入讀取 JSON 或 CSV 記錄,並向標準輸出寫出類似結構化資料的 Python 腳本。還有,既然我們已經決定使用 boto Python 程式庫來啟動 EMR 工作流,並管理其輸出,在整個專案中使用同一種語言是較為合理的。
Python 對於簡單的處理做得很好。有一段時間的性能表現相當不錯,既然該腳本十分簡單,我們甚至不需要用到第三方模組。最後,我們遇到的情況需要引入一個內部模組,來重用部分商業邏輯,或是藉由第三方模組來挖掘資料的意義,因此在這個 EMR 工作流增加了新一層的複雜度:bootstrap 動作。
一旦準備完成之後,Bootstrap 動作會在叢集的每一個節點上執行,對於簡單的第三方程式庫,bootstrap 動作可以執行類似 sudo apt-get install python-nltk 的命令。要載入紐約時報內部的程式庫,我們必須把它放在 S3 上,然後寫一支安裝腳本當作 bootstrap 動作。兩種情況都很繁瑣且耗時,我們希望有更優雅的解決方案。既然我們越來越常用 Go 來寫程式庫和服務,我們就想它是不是適用於這個情況。
我的團隊花了一年時間,用 Go 建構後端服務與 web APIs。正如 Dave Cheney 最近在 Gocon 發表的演說所指出,由於 Go 提供的並行性、易於部署、優秀效能,它通常是一個絕佳的選擇。此外,它的語法清晰,有動態語言的感覺,但又仍屬靜態型別。
當我們在線上有了若干 Go 的服務,我們注意到,在串流 mapper 之中重用這些服務中的部分邏輯,看來是可行的。結果用 Go 來寫 mapper (還有 reducer) 和 Python 比起來,並不會花太多力氣。為了示範,我會並列出用 Python 和 Go 所寫的簡單 mapper/reducer 當作例子。
#!/usr/bin/python
import sys
import simplejson as json
def main():
# loop through each line of stdin
for line in sys.stdin:
try:
# parse the incoming json
j = json.loads(line.strip())
# initialize output structure
output = dict()
# grab an identifier
output["key"] = j["data"]["key"]
# and any other useful information from input json
output["secondary-key"] = j["data"]["another-key"]
output["first-metric"] = j["data"]["metric"]
output["second-metric"] = j["data"]["metric-2"]
except Exception as e:
sys.stderr.write("unable to read log: %s" % e)
continue
try:
# generate json output
output_json = json.dumps(output)
# write the key and json to stdout
print "%s\t%s" % (output["key"], output_json)
except Exception as e:
sys.stderr.write("unable to write mapper output: %s" % e)
continue
if __name__ == "__main__":
main()
package main
import (
"bufio"
"encoding/json"
"fmt"
"log"
"os"
)
func main() {
var line []byte
var input logRecord
var output mapperOutput
var outputJSON []byte
var err error
// loop through each line of stdin
ls := bufio.NewScanner(os.Stdin)
for ls.Scan() {
line = ls.Bytes()
// parse the incoming json
if err = json.Unmarshal(line, &input); err != nil {
log.Print("unable to read log: ", err)
continue
}
// initialize output structure
output = mapperOutput{
// grab an identifier
input.Data.Key,
// and any other useful information from input json
input.Data.AnotherKey,
input.Data.Metric,
input.Data.AnotherMetric,
}
// generate json output
if outputJSON, err = json.Marshal(output); err != nil {
log.Print("unable to write mapper output: ", err)
continue
}
// write the key and json to stdout
fmt.Fprintf(os.Stdout, "%s\t%s\n", output.Key, outputJSON)
}
if ls.Err() != nil {
log.Print("error reading from stdin: ", ls.Err())
os.Exit(1)
}
}
type logRecord struct {
Data struct {
Key string `json:"key"`
AnotherKey string `json:"another-key"`
Metric int64 `json:"metric"`
AnotherMetric int64 `json:"metric-2"`
} `json:"data"`
}
type mapperOutput struct {
Key string `json:"key"`
SecondaryKey string `json:"secondary-key"`
FirstMetric int64 `json:"first-metric"`
SecondMetric int64 `json:"second-metric"`
}
#!/usr/bin/python
import sys
import simplejson as json
def main():
ongoing_count = {"key":""}
# loop through each line for stdin
for line in sys.stdin:
try:
# split line to separate key and value
key_val = line.split("\t", 1)
key = key_val[0]
# parse the incoming json
data = json.loads(key_val[1])
# check if incoming key equals ongoing key
if key == ongoing_count["key"]:
# inrement ongoing metrics
ongoing_count["first-metric"] += data["first-metric"]
ongoing_count["second-metric"] += data["second-metric"]
else:
# if a new key, emit ongoing counts
writeOutput(ongoing_count)
# set ongoing count with current data
ongoing_count = data
except Exception as e:
sys.stderr.write("unable to parse reducer input: %s" % e)
continue
# emit the final counts
writeOutput(ongoing_count)
def writeOutput(ongoing_count):
if ongoing_count["key"] != str():
try:
# generate json output
output_json = json.dumps(ongoing_count)
except Exception as e:
sys.stderr.write("unable to create reducer json: %s" % e)
continue
# write the key and json to stdout
print "%s\t%s" % (key, output_json)
if __name__ == "__main__":
main()
package main
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"log"
"os"
)
var tab = []byte("\t")
func main() {
var rawInput []string
var input mapperOutput
var ongoingCount mapperOutput
var err error
// loop through each line for stdin
ls := bufio.NewScanner(os.Stdin)
for ls.Scan() {
// split line to separate key and value
rawInput = bytes.SplitN(ls.Bytes(), tab, 2)
// parse the incoming json
if err = json.Unmarshal(rawInput[1], &input); err != nil {
log.Print("unable to parse reducer input: ", err)
continue
}
// check if incoming key equals ongoing key
if ongoingCount.Key == input.Key {
// inrement ongoing metrics
ongoingCount.FirstMetric += input.FirstMetric
ongoingCount.SecondMetric += input.SecondMetric
} else {
// if a new key, emit ongoing counts
writeOutput(ongoingCount)
// set ongoing count with current data
ongoingCount = input
}
}
if ls.Err() != nil {
log.Print("error reading from stdin: ", ls.Err())
os.Exit(1)
}
// emit the final counts
writeOutput(ongoingCount)
}
func writeOutput(o mapperOutput) {
if len(o.Key) == 0 {
return
}
// generate json output
data, err := json.Marshal(o)
if err != nil {
log.Print("unable to marshal reducer output: ", err)
return
}
// write the key and json to stdout
fmt.Fprintf(os.Stdout, "%s\t%s\n", o.Key, data)
}
type mapperOutput struct {
Key string `json:"key"`
SecondaryKey string `json:"secondary-key"`
FirstMetric int64 `json:"first-metric"`
SecondMetric int64 `json:"second-metric"`
}
Go 的實作在程式碼數量上有略微增加,不過,對照 Go 提供的簡易部署和效能提升,還是值得的。由於程式被編譯成單一的二進位檔,我們可以包進我們所需要的所有第三方程式庫,只要將我們的二進位檔放上 S3 就完成了部署工作,也不需要 bootstrap 動作。
我們在速度上有了很棒的提升。我在同一份資料上跑 Go 實作和舊的 Python mapper/reducer,歷經幾次執行之後,我發現平均約提升了 25% 的速度。兩者的這些 mapper/reducers 在 CSV、JSON、regex 上,都使用標準程式庫 (除了 Python 的 simplejson 是個例外)。
當我們持續建構與改進我們的平台技術,我們對 Go 越來越有信心且熟悉。從後台程序到簡單的 MapReduce 腳本,Go 都是我們團隊在伺服器端的優先選擇。它讓我們打造高性能且可靠又易於維護的服務,Go 社群的熱情,加上高品質釋出版本的速度,讓我們樂於渴望看到這套語言的未來。