Post

대용량 로그 빠르게 읽어내기

대용량의 로그처리를 go에서 어떻게 할 수 있을까 찾아보다가 미디엄에서 go를 이용해 대용량의 로그 파일을 빠른 시간에 읽는 코드를 보게 되어 포스팅 하려한다.

https://medium.com/swlh/processing-16gb-file-in-seconds-go-lang-3982c235dfa2

파일 읽기 코드

1
2
3
4
5
6
7
file, err := os.Open(fileName)
if err != nil {
    log.Print(err)
}
if file != nil {
    defer file.Close()
}

이렇게 파일을 열고 난 후에 파일을 읽는 방식은 2가지가 있다.

  1. 파일을 한 줄씩 읽는다.
    메모리에는 부담이 적지만, Input, Output에 많은 시간이 든다.
  2. 파일 전체를 한 번에 읽고 처리한다.
    메모리를 훨씬 많이 사용하므로 시간이 훨씬 많이 걸린다.

16GB이상의 대용량 로그라면 ?

1번 방식은 상당히 오랜 시간이 걸릴 것이고, 2번 방식은 거의 불가능 할 것이다.
그럼 어떻게 해야할까…

바로 chunk 단위로 파일을 읽어서 데이터를 처리한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
reader := bufio.NewReader(file)
for {
    buf := make([]byte, 1024*4) // chunk size
    n, err := reader.Read(buf)
    buf = buf[:n]
    if n == 0 {
        if err != nil || err == io.EOF {
            log.Println(err)
            // log.Fatal(err)
            break
        }
        return err
    }
}

대용량의 데이터라면 좀 더 신경써서 처리한다.
sync.Pool과 goroutine을 사용하여 대량의 데이터를 동시에 읽고 처리할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func fileRead(file *os.File) (err error) {
	linesPool := sync.Pool{New: func() interface{} {
		lines := make([]byte, 500*1024)
		return lines
	}}
	stringPool := sync.Pool{New: func() interface{} {
		lines := ""
		return lines
	}}
	slicePool := sync.Pool{New: func() interface{} {
		lines := make([]string, 100)
		return lines
	}}

	reader := bufio.NewReader(file)
	d := 0

	for {
		buf := linesPool.Get().([]byte)
		n, err := reader.Read(buf)
		// fmt.Printf("%v\n", n)
		buf = buf[:n]

		if n == 0 {
			if err != nil || err == io.EOF {
				log.Println(err)
				// log.Fatal(err)
				break
			}

			return err
		}

        // 개행될 때까지 더 진행 (한 문장단위)
		nextUntilNewLine, err := reader.ReadBytes('\n')
		if err != io.EOF {
			buf = append(buf, nextUntilNewLine...)
		}

		d += processChunk(buf, &linesPool, &stringPool, &slicePool)
	}
	return nil
}

chunk 단위로 끊어서 읽었다면, 해당 로그에 전처리 작업을 진행할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func processChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool) int {

	//another wait group to process every chunk further
	var wg2 sync.WaitGroup
	// stringPool 불러오기
	logs := stringPool.Get().(string)

	// 청크데이터를 문자열 변환
	logs = string(chunk)
	linesPool.Put(chunk) // put back the chunk in pool

	// slicePool 가져오기
	logSlice := slicePool.Get().([]string)
	// 개행 기준으로 string 배열 생성
	logSlice = strings.Split(logs, "\n")

	// stringPool 반환
	stringPool.Put(logs)

	// 100줄만 읽기
	chunkSize := 100
	length := len(logSlice)

	// 청크 탐색
	for i := 0; i < length; i += chunkSize {
		wg2.Add(1)
		// 청크 계산
		start := i * chunkSize
		end := minInt((i+1)*chunkSize, len(logSlice))
		for i := start; i < end; i++ {
			text := logSlice[i]
			if len(text) == 0 {
				continue
			}
		}

		// 전처리 작업용
		/*
			go func(start, end int) {
				for i:=start; i<end; i++ {
					text := logSlice[i]
					if len(text) == 0 {
						continue
					}

				}
			}
		*/

	}
	// 청크 다 끝날떄까지 기다리기  
        wg2.Wait()

	// slicePool 반환
	slicePool.Put(logSlice)
	return 1
}

stringPool을 통해 로그들을 받을 준비를 해주고, linesPool은 사용이 끝나 Put으로 반환한다.
slicePool은 logs들을 \n 단위로 쪼개 저장한다.
이렇게 읽은 데이터를 goroutine을 통해 전처리를 병렬적으로 진행해준다.


FullCode 및 결과는 여기서

This post is licensed under CC BY 4.0 by the author.