저번 글에서는 Go로 S3 Uploader와 Downloader를 구현하고 strings.Trim() strings.Replace()를 사용하여 데이터 형식을 변경한 방법에 대해서 소개하였다. 자세한 글은 아래에서 확인 가능!
오늘은 Goroutin을 제어하는 방법 중 Mutex, Semaphore를 이용한 제어 방법과 그것을 어떻게 내 코드에 적용하였는가에 대해서 알아보겠다.
Mutex
Mutex는 Mutual Exclusion의 줄임말로 하나의 공유 자원에 대해서 반드시 접근 하나만이 가능하도록 제한하는 것을 말한다. 반드시 하나의 접근(Go의 경우 Goroutine이 된다)으로 작업하고자 하는 영역을 critical section이라 하며, mutex는 이 critical section에서 작업이 하나만 이뤄지는 것을 보장하기 위해 Lock과 Unlock을 사용한다 .
Golang에서는 sync 패키지의 Mutex 타입을 사용하여 이를 구현할 수 있다. mutex.Lock과 mutex.Unlock을 사용하여 그 사이에 있는 코드들을 critical section으로 만들어줄 수 있다.
나의 경우 개발을 하다가 "fatal error: concurrent map writes"라는 에러를 만나 mutex를 쓰게 되었다. (좋은 방법이 아니라는 것은 느껴지지만) 전역 변수인 map을 먼저 선언한 뒤, gorutine 으로 돌아가는 스크래핑 코드 내부에서 이 map을 초기화하고 거기에 데이터를 쓰는 방법을 취했다. 하지만 변수 초기화, 쓰기가 고루틴 안에서 이뤄지다 보니 동일한 변수에 대해서 서로 다른 초기화와 쓰기가 동시에 이뤄져 문제가 발생하였다.
그렇다면 애초에 map이 atomic한 연산을 지원했으면 문제가 아니지 않느냐 궁금할 수도 있을 것 같다. (나는 무지성으로 코드 짤 때는 안 궁금하다가 이후 이 블로그 글 쓰면서 공부하게 되었지만...) Go 개발자의 문서를 보면 아래와 같이 써있는 것을 볼 수 있다.
Why are map operations not defined to be atomic?
After long discussion it was decided that the typical use of maps did not require safe access from multiple goroutines, and in those cases where it did, the map was probably part of some larger data structure or computation that was already synchronized. Therefore requiring that all map operations grab a mutex would slow down most programs and add safety to few. This was not an easy decision, however, since it means uncontrolled map access can crash the program.
The language does not preclude atomic map updates. When required, such as when hosting an untrusted program, the implementation could interlock map access.
Map access is unsafe only when updates are occurring. As long as all goroutines are only reading—looking up elements in the map, including iterating through it using a for range loop—and not changing the map by assigning to elements or doing deletions, it is safe for them to access the map concurrently without synchronization.
As an aid to correct map use, some implementations of the language contain a special check that automatically reports at run time when a map is modified unsafely by concurrent execution.
요약하자면 map에 동시적으로 접근하여 쓰기 작업을 하는 상황을 특수한 상황이라고 판단했다는 것. (나는... 특수한 작업을 했다는 것이다...) 보통 작업에서는 그러지 않기 때문에 atomic한 것을 보장하는 것보다는 속도를 더 빠르게 하는 것을 택한 것 같았다.
여튼 사용 방법은 크게 어렵지 않기 때문에 아래와 같이 구현하였다.
// 전역 변수로 mutex 생성
var mutex = new(sync.Mutex)
// 스크래핑 파트에서 필요한 정보 모두 기록 & concurrent map writes 에러를 피하기 위한 mutex 설정
mutex.Lock()
subwayData = make(map[string]string)
subwayData["lineNum"] = lineNum
subwayData["stationNm"] = stationNm
subwayData["weekTag"] = tag
subwayData["arriveTime"] = arriveTime
subwayData["inOutTag"] = inOutTag
data = append(data, subwayData)
mutex.Unlock()
Semaphore
Mutex가 하나의 접근만을 허용한다면, Semaphore를 이용하면 N개의 접근을 허용한다.
Golang에서 semaphore를 구현하는 방법은 2가지가 있는 것 같다
1. channel을 이용한 구현
2. golang.org/x/sync에 있는 weighted semaphore 사용하기
나는 어쩌다보니 1번 방향으로 구현하였는데(chatgpt씨가 저기로 안내했음...) 찾아보니 1번 방법이 조금 더 Go스러운 방법인 것 같다는 의견이 있는 것 같다. 1번으로도 충분히 구현 가능하지만, 2번을 사용하면 좀 더 정교하게 semaphore 구현과 컨트롤이 가능하다고 하는 것 같긴 하다. (한 번에 작업을 할 수 있는 스레드의 수를 제한, 즉 작업 허가증의 수를 정하고 그 이용권을 Accuire해서 작업, 작업 이후에는 Release로 반납하는 방식인 것 같다)
여튼 내가 구현한 코드는 다음과 같다.
- 한 호선에 대한 {stationNm:"역이름", naverCode:역코드} 로 이루어진 slice를 info에 담는다
- for 문으로 info의 원소에 접근하고, 그 원소에 있는 정보들을 바탕으로 스크래퍼 코드(runCrawler)를 고루틴으로 돌린다
- 이때 고루틴은 최대 15개까지만 돌도록 제한하며
- 15개의 고루틴이 돌아간 이후에는 3초 간의 휴식 시간을 강제로 갖게 한다 (네이버 서버 부담을 줄이기 위함)
Semaphore를 활용한 Goroutine 제어 소스 코드
var (
subwayData map[string]string
data []map[string]string
mutex = new(sync.Mutex)
)
func HandleRequest(_ context.Context) (string, error) {
targetLines = targetLines[startLine:endLine]
// 각 역의 정보를 바탕으로 크롤링 시작. for문을 돌며 호선 이름과 그 호선에 해당하는 역 정보 가져오고 -> 그거 바탕으로 크롤링
var baseURL string = "https://pts.map.naver.com/end-subway/ends/web/"
for _, lineNum := range targetLines {
info, _ := INFO[lineNum] // info: target 호선의 naverCode, stationNm으로 이루어진 slice
data = make([]map[string]string, 0, len(info)) // 각 호선의 데이터만 담을 수 있도록 슬라이스 초기화
fmt.Println("타겟 라인: ", lineNum, " 크롤링 시작")
// go 루틴 개수 15개로 제한, 각 go 루틴 실행 종료 시점 수집을 위한 채널 생성
sem := make(chan struct{}, 15)
done := make(chan struct{}, len(info))
for i, val := range info {
sem <- struct{}{} // 최대 go 루틴의 개수 제어
go func(val map[string]interface{}) {
runCrawler(val, baseURL, lineNum)
<-sem // go 루틴이 종료되면 세마포어 반환
done <- struct{}{} // go 루틴이 종료되었음을 알리기 위해 done 채널에 값 전송
}(val)
if (i+1)%15 == 0 { // 네이버 서버에 부담을 주지 않도록 고루틴 15개마다 sleep
for j := 0; j < 15; j++ {
<-done
}
fmt.Println("--- 3초 휴식 ---")
time.Sleep(3 * time.Second)
}
}
// 모든 go 루틴이 종료될 때까지 done 채널에서 값 수신
for i := 0; i < len(info)%15; i++ {
<-done
}
}
}
semaphore 구현 위주로 하나하나 뜯어보자
sem은 실제 고루틴의 병렬처리 갯수를 제한하기 위해, done은 고루틴이 종료될 때까지 프로그램을 대기시키기 위해 만든 채널이다.
후자 채널의 경우 채널이 수신자와 송신자를 서로 기다리는 속성을 이용한 것이다. 메인 함수는 고루틴 실행을 기다려주지 않기 때문에 (데이터 교환 측면에서는 의미 없는) 채널을 생성하여 강제로 기다리게 하는 것. 심플한 예시를 보면 아래와 같다.
func main() {
done := make(chan struct{})
go func() {
for i := 0; i < 5; i++ {
fmt.Println(i)
}
done <- struct{}{}
}()
// 고루틴이 끝날 때까지 대기
<-done
}
소스 코드와 바로 위 예시 모두 channel의 자료형을 struct로 하는 것이 독특한 점인데, 이는 이후에 channel에서 다루는 값을 크기가 0인 빈 구조체(struct{}{})로 하여 메모리를 효율성을 보장하기 위해서이다. (물론 처음에는 chatGPT씨가 짜줘서 쓴거지만, 후에 찾아보니 그런 심오한 뜻이 있더라... 참고자료1, 참고자료2) 여튼 내 코드로 돌아오면, 채널 2개를 아래와 같이 생성하였다
# 사이즈가 15인 채널을 생성한다 = 작업 허가증을 15개 발행한다
sem := make(chan struct{}, 15)
# 고루틴이 모두 끝낼 때까지 프로그램이 대기하게 만들기 위한 채널을 추가 생성
done := make(chan struct{}, len(info))
본격적으로 for 문을 돌며 info 안에 있는 원소들에 대해서 작업을 실행하게 된다. 흐름은 코드에 대한 설명은 아래와 같다.
for문을 하나 돌때마다 sem에 빈 구조체 하나를 넣어준다(작업 허가증을 하나 준다).
이때 sem의 buffer size는 15이므로 16번째 for문을 돌며 코드를 실행하려고 해도 앞에 작업이 끝나고 sem에서 값이 하나 빠져나가(<-sem)지 않는 이상 대기해야 한다.
sem <- struct{}{}를 통과한다면 runCrawler 함수를 실행하게 된다. 이때 runCrawler 함수가 모두 실행되고 나서야 <-sem이 실행된다. 또 바로 이어 done 채널에 빈 구조체 하나가 들어오게 된다.
만약 for문이 15배수로 실행되었다면 done에서 빈 구조체 15개를 빼온다(만약 아직 15개가 안 채워졌다면, 채널의 속성(데이터가 들어올때까지 대기)에 의해 기다리게 된다). 15개의 빈 구조체가 done에서 빠지게 되면 time.Sleep을 통해 for문이 돌아가는 것을 3초간 강제 휴식 시킨다.
for i, val := range info {
sem <- struct{}{} // semaphore를 획득하여 최대 go 루틴의 개수 제어
go func(val map[string]interface{}) {
runCrawler(val, baseURL, lineNum)
<-sem // go 루틴이 종료되면 semaphore를 반환
done <- struct{}{} // go 루틴이 종료되었음을 알리기 위해 done 채널에 값 전송
}(val)
if (i+1)%15 == 0 { // 네이버 서버에 부담을 주지 않도록 고루틴 15개마다 sleep
for j := 0; j < 15; j++ {
<-done
}
fmt.Println("--- 3초 휴식 ---")
time.Sleep(3 * time.Second)
}
}
이후 info의 모든 원소에 대해 익명함수가 돌아갈 때까지, 즉 info의 원소 수만큼 done 채널 값이 반납될때까지 대기하게 한다. 이때 이미 위에서 15배수만큼 done 채널 값이 반납되었으므로, 전체 info의 원소 갯수에서 15로 나누고 나머지 값 만큼의 done 채널 값만 반환되면 된다.
// 모든 go 루틴이 종료될 때까지 done 채널에서 값 수신
for i := 0; i < len(info)%15; i++ {
<-done
}
참고 자료