본문 바로가기
프로젝트 일지/BMT

5. Go를 이용한 스크래퍼 리팩토링하기 (1) - Introduction, 그런데 goquery, S3 Upload, Lambda 활용 코드를 곁들인

by 데브겸 2023. 8. 8.

이 글에서는 go에서 goquery, lambda, s3 다루는 법을 자세하게 설명하지는 않습니다.

이 블로그에 다른 글을 통해 업로드 할 예정입니다. 그래도 제가 사용한 소스 코드 전체는 있으니 참고 가능합니다.

 

 

지난 번에 Python의 Multiprocessing 모듈을 활용하여 병렬처리를 하고, 해당 코드를 AWS Lambda에 올리려 했다가 대차게 실패했다는 글을 적었다. 해당 글은 아래에 https://kyumcoding.tistory.com/42

 

4. 파이썬 크롤링 코드에 Process, Pipe 적용하기

저번 시간에 AWS Lambda에서 Pool을 지원하지 않는다는 사실을 알게 되었다. (자세한 과정은 여기 참고) Process와 Pipe를 이용해서 코드를 다시 짜라는 말에 너무나 슬펐지만... 완성하려면 모... 짜야지

kyumcoding.tistory.com

 

팀 내부에서 정 안 되면 Lambda 대신 EC2를 띄워서 코드를 돌리자는 얘기가 나왔지만, 

하루에 최대 30분만 돌리면 되는 상황에서 EC2를 띄우는 것은 너무너무너무 돈이 아까웠고 비효율적이라고 생각했다.

그리고 사실 문제가 해결 안 되고 포기해야 하는 상황이 너무 마음에 안 들었다 😇

 

문제가 해결 안 되는 상황에서 문득 든 생각... "Go가 그렇게 빠르다던데 그걸로 다시 짜볼까...?"

(1. 고루틴? 경량화 스레드? 그런거 모르고 진짜 빠르고 좋다는 말만 믿고 시작했다)

(2. 다시 생각해보니 파이썬 멀티스레드를 시도해봤던 것 같긴한데, 정확히 기억이 안 난다... 이래서 바로바로 기록해야 하는데)

 

 

이쯤되면 왜 파이썬 코루틴을 안 썼냐는 의문이 들을 수 있지만, 그때 당시에 나는 코루틴의 존재도 몰랐다... beautifulsoup과 selenium 스크래퍼 클론 코딩만 좀 해본 진짜 아무것도 모르는 사람이었다고 생각하면 편하다.

 

머리가 나쁘면 몸이 고생한다고,,, 코루틴이나 Ray를 사용하면 훨씬 더 빠르게 스크래퍼를 성공시킬 수 있었을텐데 아예 언어를 바꿔보는 무모한 짓을 시작해버렸다... 마침 시험 기간이라 잠깐 프로젝트가 2주 쉬는 타이밍이었는데, 이 때 완성시켜서 팀원들에게 자랑하기 위해 후다닥 나만의 작은 Go 언어 프로젝트를 시작했다.

 

우선 성능상 결론을 말하자면 대성공! 스크래퍼 2개에 대해서 아래와 같은 성능 향상을 이뤄낼 수 있었다.

처음 파이썬 코드에 비해 역 코드 스크래퍼는 약 50배, 시간표 스크래퍼의 경우 약 62배의 처리 속도 향상을 이뤄냈다. 속도도 속도지만, 경량화 스레드 덕분에 더 가볍게 돌아가 Lambda에서 OS에러를 내지 않는 점도 큰 장점이다. 

 

(하지만 전체적인 결과로 봤을 때 팀에서 Go로 짜여진 코드를 이해하고 고칠 수 있는 사람이 나밖에 없어서 여러모로 팀워크에 제약이 있었다는 일장일단의 조치였다)

 

 


 

여튼 서론이 길었다! 우선 노마드 코더 니콜라스 선생님의 Go 입문 강의를 통해 기본기를 익혔다(약 일주일). 사실 언어 자체가 심플한 것을 추구하기 때문에 새로 익히는데 시간이 그렇게 오래 걸리지는 않았다. 다만 Go를 설치하고, path 설정, 라이브러리를 import하는 과정이 빡돌았다; Go 홈페이지에서 다운받고, 뭔가 안 되는 것 같아서 vi 로 path 바꾸고, 근데 그거 안 되어서 brew로 다시 깔고, 안 되니까 JetBrain Goland로 한 번 더 깔고... 이러니까 path가 엉망이 되어서 결국 다 지우고 다시 깔았다. 다른 분들은 그냥 Go 웹페이지에서 다운 받는 것으로 쭉 밀고 가든지, 학생이라면 Goland만 쭉 쓰는 것을 추천한다.....

 

내가 리팩토링하려는 스크래퍼 2개 중 하나인 '역 코드 스크래퍼'는 정적인 웹페이지를 스크래핑해야 했다. 이는 마침 노마드 코드에서 익혔던 Goquery 라이브러리를 통해 충분히 구현 가능했다.

 

우선 코드의 흐름은 다음과 같다.

 

0. 코드는 AWS Lambda 위에서 

1. "https://pts.map.naver.com/end-subway/ends/web/{역 코드}/home?timemode="의 {역 코드}에 100~20000 숫자를 포함하여 쿼리를 날리고, 해당 페이지에 역 정보가 있으면 역 이름, 역 코드를 가져오고 없으면 패스한다.

2. 전체 결과를 하나의 json 파일로 만든다.

3. 만들어진 json 파일을 S3에 업로드 한다.

 

코드는 GitHub에서 확인 가능하고, 아래에서도 확인 가능하다.

  • scrapeNaverCode 함수가 goquery로 페이지를 스크래핑한다
  • run 함수는 scrapeNaverCode를 고루틴으로 제어함과 동시에 스크래핑해온 정보를 받아 알맞은 형태로 가공하는 역할을 한다
  • AWSConfigure 함수는 S3 사용을 위한 credential 설정, client 설정
  • S3Uploader는 map 형태의 data를 json으로 marshal하여 S3에 업로드하는 역할
  • HandleRequest는 위 4가지 함수들을 Lambda에서 실행시키기 위해 반드시 사용해야 할 포맷 같은 것
package main

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"github.com/PuerkitoBio/goquery"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/credentials"
	"github.com/aws/aws-sdk-go-v2/service/s3"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/joho/godotenv"
	"log"
	"net/http"
	"os"
	"sort"
	"strconv"
	"sync"
	"time"
)

type extractedInfo struct {
	lineNum   string
	stationNm string
	naverCode int
}

type BucketBasics struct {
	S3Client *s3.Client
}

var INFO = map[string][]map[string]interface{}{}
var wg = new(sync.WaitGroup)

// AWS S3 사용을 위한 credential 설정 & client 생성
func AWSConfigure() BucketBasics {
	staticProvider := credentials.NewStaticCredentialsProvider(
		os.Getenv("AWS_BUCKET_ACCESS_KEY"),
		os.Getenv("AWS_BUCKET_SECRET_KEY"),
		"")

	sdkConfig, err := config.LoadDefaultConfig(
		context.Background(),
		config.WithCredentialsProvider(staticProvider),
		config.WithRegion(os.Getenv("AWS_REGION")),
	)
	checkErr(err)

	s3Client := s3.NewFromConfig(sdkConfig)
	bucketBasics := BucketBasics{s3Client}

	return bucketBasics
}

func run(num int) {
	var baseURL string = "https://pts.map.naver.com/end-subway/ends/web/"

	// 200개 돌며 네이버 코드 스크래핑 (feat. 고루틴)
	c := make(chan extractedInfo)
	for i := (num - 1) * 200; i < (num * 200); i++ {
		wg.Add(1)
		go scrapeNavercode(i, baseURL, c)
		wg.Done()
	}
	wg.Wait()

	// go 루틴에서 채널로 값 받아오기 & 받은 값을 이후에 처리하기 쉬운 형태로 가공
	for i := (num - 1) * 200; i < (num * 200); i++ {
		result := <-c
		lineNum := result.lineNum
		block := make(map[string]interface{})

		// 만약 받아온 값이 없다면 무시하고 아니라면 값 정리
		if result.stationNm == "" || result.lineNum == "" {
			continue
		} else {
			block["stationNm"] = result.stationNm
			block["naverCode"] = result.naverCode
		}

		// 만약 key에 호선이 없으면 새로운 key로 추가 후 정보 입력
		_, ok := INFO[lineNum]
		if ok == false {
			INFO[lineNum] = []map[string]interface{}{}
		}
		INFO[lineNum] = append(INFO[lineNum], block)

		// naverCode 기준으로 오름차순 정렬
		sort.Slice(INFO[lineNum], func(i, j int) bool {
			return INFO[lineNum][i]["naverCode"].(int) < INFO[lineNum][j]["naverCode"].(int)
		})
	}

}

func scrapeNavercode(code int, baseURL string, c chan<- extractedInfo) {
	pageURL := baseURL + strconv.Itoa(code) + "/home"

	// pageURL로 접속하기
	res, err := http.Get(pageURL)
	checkErr(err)
	checkCode(res)

	// 작업 끝나면 res.Body 닫아주는 명령 예약
	defer res.Body.Close()

	// html 읽기
	doc, err := goquery.NewDocumentFromReader(res.Body)
	checkErr(err)

	// 지하철 정보 파싱 후 채널로 보내기
	lineNum := doc.Find(".line_no").Text()
	stationNm := doc.Find(".place_name").Text()
	fmt.Println(code, " 확인완료")

	c <- extractedInfo{
		lineNum:   lineNum,
		stationNm: stationNm,
		naverCode: code}
}

// 에러 체킹용 함수 1
func checkErr(err error) {
	if err != nil {
		log.Fatalln(err)
	}
}

// 에러 체킹용 함수 2
func checkCode(res *http.Response) {
	if res.StatusCode != 200 {
		log.Fatalln("Request failed with Status:", res.StatusCode)
	}
}

func S3Uploader(INFO map[string][]map[string]interface{}, basics BucketBasics, fileName string) error {
	// data가 struct 형태일때는 이상하게 marshal이 되더니, map으로 바꾸니까 한방에 marshal이 잘 됨. 이유가 뭘까?
	content, err := json.MarshalIndent(INFO, "", " ")
	if err != nil {
		log.Fatalln("JSON marshaling failed: %s", err)
	}

	// json 바이트 스트림을 S3에 업로드
	_, err = basics.S3Client.PutObject(context.TODO(), &s3.PutObjectInput{
		Bucket: aws.String(os.Getenv("AWS_BUCKET_NAME")),
		Key:    aws.String(fileName),
		Body:   bytes.NewReader(content),
	})
	if err != nil {
		return fmt.Errorf("failed to upload, %v", err)
	}
	fmt.Println(fileName + "file successfully uploaded in S3")
	return nil
}

func HandleRequest(ctx context.Context) {
	start := time.Now()

	err := godotenv.Load()
	if err != nil {
		log.Fatal("Error loading .env file")
	}
	// 네이버 서버에 부담을 덜기 위해 batch로 나눠서 크롤링 진행 (batch 마다 5초 휴식)
	// tcp: broken pipe 에러를 피하기 위해 최대한 작게 배치 사이즈 설정
	for batchNum := 1; batchNum < 101; batchNum++ {
		fmt.Println(batchNum, "번째 배치 돌기 시작")
		// run 함수 내에서 wg.Wait()를 통해 동시 너무 많이 접속 시도를 하지 않도록 제어
		run(batchNum)
		fmt.Println(batchNum, "번째 배치 돌고, 5초 쉬기 시작")
		time.Sleep(time.Second * 5)
	}

	// 크롤링 결과 파일로 저장하기
	fileName := "subway_information.json"

	end := time.Since(start)
	fmt.Println("총 실행 시간 : ", end)

	// 저장한 json 파일 s3에 업로드
	bucktBasics := AWSConfigure()
	S3Uploader(INFO, bucktBasics, fileName)
}

func main() {
	lambda.Start(HandleRequest)
}

 


글이 길어지니까 이번 글에서는 HandleRequest와 main 부분만 설명한다

 

HandleRequest & main

 

우선 가장 첫번째 부분인 HandleRequerst와 main. 사실상 main은 HandleRequest를 실행시키기 위한 수단에 지나지 않기 때문에 HandleRequest만 설명한다. (Lambda 함수 코드의 엔트리포인트. lambda.Start(HandleRequest)를 추가하면 됨)

func HandleRequest(ctx context.Context) {
	start := time.Now()

	err := godotenv.Load()
	if err != nil {
		log.Fatal("Error loading .env file")
	}
	// 네이버 서버에 부담을 덜기 위해 batch로 나눠서 크롤링 진행 (batch 마다 5초 휴식)
	// tcp: broken pipe 에러를 피하기 위해 최대한 작게 배치 사이즈 설정
	for batchNum := 1; batchNum < 101; batchNum++ {
		fmt.Println(batchNum, "번째 배치 돌기 시작")
		// run 함수 내에서 wg.Wait()를 통해 동시 너무 많이 접속 시도를 하지 않도록 제어
		run(batchNum)
		fmt.Println(batchNum, "번째 배치 돌고, 5초 쉬기 시작")
		time.Sleep(time.Second * 5)
	}

	// 크롤링 결과 파일로 저장하기
	fileName := "subway_information.json"

	end := time.Since(start)
	fmt.Println("총 실행 시간 : ", end)

	// 저장한 json 파일 s3에 업로드
	bucktBasics := AWSConfigure()
	S3Uploader(INFO, bucktBasics, fileName)
}

func main() {
	lambda.Start(HandleRequest)
}

 

우선 HandleRequest 부분을 보면 ctx context.Context가 있는데, 이때 context는 Lambda 함수 호출을 위한 런타임 정보를 제공하기 위해 사용된다 (자세한 설명은 공식 문서 참고)

func HandleRequest(ctx context.Context) { }

 

 

시간 측정을 위한 코드 (자세한 사용 방법은 여기를 참고)

start := time.Now()

end := time.Since(start)
fmt.Println("총 실행 시간 : ", end)

 

.env 파일에서 환경변수를 가져오기 위한 코드 (자세한 사용 방법은 여기를 참고)

err := godotenv.Load()
if err != nil {
	log.Fatal("Error loading .env file")
}

 

결국 내 코드는 100~20000까지 19900번 네이버 서버에 요청을 날려야 하는 것. 단시간 안에 너무 많은 요청을 날리면 안 되기 때문에 batch를 나누고 각 batch마다 5초씩 sleep 하기로 함. 19900개를 몇 개의 batch로 나눌 것인지는 실험을 통해 알아냈는데, (고루틴을 통해) 한 번에 너무 많이 병렬처리를 시도하면 lambda에서 tcp: broken 에러가 나오고, 너무 적게 처리하면 timeout에 간당간당해짐. 

 

아무리 고루틴이더라도 lambda에서 너무 많이 돌리면 broken pipe가 발생하긴 하더라

 

실험을 통해 각 배치 당 200개로 설정하는 것이 무난하다는 것을 발견. 배치당 200개의 역씩, 100배치 즉, 20000까지 돌게 하는 코드 작성. (run은 스크래핑을 담당하는 코드)

// 네이버 서버에 부담을 덜기 위해 batch로 나눠서 크롤링 진행 (batch 마다 5초 휴식)
// tcp: broken pipe 에러를 피하기 위해 최대한 작게 배치 사이즈 설정

for batchNum := 1; batchNum < 101; batchNum++ {
	fmt.Println(batchNum, "번째 배치 돌기 시작")
	// run 함수 내에서 wg.Wait()를 통해 동시 너무 많이 접속 시도를 하지 않도록 제어
	run(batchNum)
	fmt.Println(batchNum, "번째 배치 돌고, 5초 쉬기 시작")
	time.Sleep(time.Second * 5)
}

 

 

다음 글에서는 run과 scrapeNaverCode 함수를 자세히 뜯어보며 goquery 사용법과 syn.WaitGroup를 이용한 goroutine 제어에 대해서 적겠다. (그 다음 글에는 AWS Configure와 S3 업로드에 대해서 알아볼 예정! 하나하나 엄청 고생하면서 했어서, 조금 더 자세하게 남기고 싶은 욕심이 있어 끊어간다)