대용량 로그 빠르게 읽어내기
대용량의 로그처리를 go에서 어떻게 할 수 있을까 찾아보다가 미디엄에서 go를 이용해 대용량의 로그 파일을 빠른 시간에 읽는 코드를 보게 되어 포스팅 하려한다.
https://medium.com/swlh/processing-16gb-file-in-seconds-go-lang-3982c235dfa2
파일 읽기 코드
1
2
3
4
5
6
7
file, err := os.Open(fileName)
if err != nil {
log.Print(err)
}
if file != nil {
defer file.Close()
}
이렇게 파일을 열고 난 후에 파일을 읽는 방식은 2가지가 있다.
- 파일을 한 줄씩 읽는다.
메모리에는 부담이 적지만, Input, Output에 많은 시간이 든다. - 파일 전체를 한 번에 읽고 처리한다.
메모리를 훨씬 많이 사용하므로 시간이 훨씬 많이 걸린다.
16GB이상의 대용량 로그라면 ?
1번 방식은 상당히 오랜 시간이 걸릴 것이고, 2번 방식은 거의 불가능 할 것이다.
그럼 어떻게 해야할까…
바로 chunk 단위로 파일을 읽어서 데이터를 처리한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
reader := bufio.NewReader(file)
for {
buf := make([]byte, 1024*4) // chunk size
n, err := reader.Read(buf)
buf = buf[:n]
if n == 0 {
if err != nil || err == io.EOF {
log.Println(err)
// log.Fatal(err)
break
}
return err
}
}
대용량의 데이터라면 좀 더 신경써서 처리한다.
sync.Pool과 goroutine을 사용하여 대량의 데이터를 동시에 읽고 처리할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func fileRead(file *os.File) (err error) {
linesPool := sync.Pool{New: func() interface{} {
lines := make([]byte, 500*1024)
return lines
}}
stringPool := sync.Pool{New: func() interface{} {
lines := ""
return lines
}}
slicePool := sync.Pool{New: func() interface{} {
lines := make([]string, 100)
return lines
}}
reader := bufio.NewReader(file)
d := 0
for {
buf := linesPool.Get().([]byte)
n, err := reader.Read(buf)
// fmt.Printf("%v\n", n)
buf = buf[:n]
if n == 0 {
if err != nil || err == io.EOF {
log.Println(err)
// log.Fatal(err)
break
}
return err
}
// 개행될 때까지 더 진행 (한 문장단위)
nextUntilNewLine, err := reader.ReadBytes('\n')
if err != io.EOF {
buf = append(buf, nextUntilNewLine...)
}
d += processChunk(buf, &linesPool, &stringPool, &slicePool)
}
return nil
}
chunk 단위로 끊어서 읽었다면, 해당 로그에 전처리 작업을 진행할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func processChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool) int {
//another wait group to process every chunk further
var wg2 sync.WaitGroup
// stringPool 불러오기
logs := stringPool.Get().(string)
// 청크데이터를 문자열 변환
logs = string(chunk)
linesPool.Put(chunk) // put back the chunk in pool
// slicePool 가져오기
logSlice := slicePool.Get().([]string)
// 개행 기준으로 string 배열 생성
logSlice = strings.Split(logs, "\n")
// stringPool 반환
stringPool.Put(logs)
// 100줄만 읽기
chunkSize := 100
length := len(logSlice)
// 청크 탐색
for i := 0; i < length; i += chunkSize {
wg2.Add(1)
// 청크 계산
start := i * chunkSize
end := minInt((i+1)*chunkSize, len(logSlice))
for i := start; i < end; i++ {
text := logSlice[i]
if len(text) == 0 {
continue
}
}
// 전처리 작업용
/*
go func(start, end int) {
for i:=start; i<end; i++ {
text := logSlice[i]
if len(text) == 0 {
continue
}
}
}
*/
}
// 청크 다 끝날떄까지 기다리기
wg2.Wait()
// slicePool 반환
slicePool.Put(logSlice)
return 1
}
stringPool을 통해 로그들을 받을 준비를 해주고, linesPool은 사용이 끝나 Put으로 반환한다.
slicePool은 logs들을 \n 단위로 쪼개 저장한다.
이렇게 읽은 데이터를 goroutine을 통해 전처리를 병렬적으로 진행해준다.
FullCode 및 결과는 여기서
This post is licensed under CC BY 4.0 by the author.