저번 시간에 AWS Lambda에서 Pool을 지원하지 않는다는 사실을 알게 되었다. (자세한 과정은 여기 참고)
Process와 Pipe를 이용해서 코드를 다시 짜라는 말에 너무나 슬펐지만... 완성하려면 모... 짜야지...
내가 운영체제에 대한 이해가 깊지 않다보니 process와 pipe를 이해하는데 있어 너무나도 많은 노력이 필요했다.
우선 전체적인 process의 사용방법은 아래와 같다
import multiprocessing
from multiprocessing import Process
def practice(arg):
print('multiprocessing is so hard')
p1 = Process(target=practice, args='1') # 프로세스 인스턴스 생성
p1.start() # 프로세스 인스턴스 돌리기
p1.join() # join을 사용할 경우 앞서 실행한 프로세스가 종료될 때까지 기다림
우선 process 인스턴스를 만들고, 멀티프로세싱을 적용할 함수 등을 target에 적는다 (이때 practice()가 아니라 practice 꼴로 적어야 한다). 뒤에는 인자로 넣어야 하는 것들을 적어넣는데, 여기에 range를 이용하여 for in range 같은 것을 구현하기도 하는 것 같다. 이후에 등장할 pipe도 여기에 넣어서 프로세스 간 값을 넘겨주는데 사용하기도 한다.
이후에 start를 통해서 멀티프로세싱을 시작하고, join을 통해 start로 시작한 프로세스가 종료될 때까지 기다리고, 프로세스를 종료시킨다.
pipe
솔직히 multiprocessing 라이브러리에 있는 pipe가 운영체제에 나오는 그 pipe와 완전히 똑같은 것인지는 잘 모르겠다. 하지만, 프로세스 간 값을 주고받는 것, 프로세스 간 통신을 한다는 점에서 둘은 비슷한 컨셉의 도구이거나 같은 것이라고 이해를 했다. (pip install pipe가 따로 있는 것을 보니 그냥 컨셉만 비슷한 것이라고 이해하는 것이 좋겠다)
우선 가장 설명이 잘 되어 있는 자료는 (역시나) 모 인도인 유튜버씨의 설명... https://www.youtube.com/watch?v=5ff4_YK83fQ
from multiprocessing import Process, Pipe
elements = ['h', 'e', 'l', 'l', 'o']
def send_hello(conn, args):
for element in elements:
conn.send(element)
conn.close()
def recv_hello(conn):
for _ in range(len(elements)):
print(conn.recv())
parent_conn, child_conn = Pipe() # pipe는 2개의 connection object를 반환
# 반환되는 object들에 위계는 없으나 편의상 parent, child 이름을 붙여줌
# 프로세스 인스턴스 생성
p1 = Process(target=send_hello, args=(parent_conn, elements))
p2 = Process(target=recv_hello, args=(child_conn,))
# 프로세스 인스턴스 돌리기 & 종료
p1.start()
p2.start()
p1.join()
p2.join()
h
e
l
l
o
파이프로 하는 것은 동기화에 대한 보장이 없다고..? 진짠가? 이걸 어떻게 보장할 수 있을까를 좀 더 고민해봐야겠네
여튼 이런 것들은 고민해보고 적용해서 이래저래 코드 고치고, chatgpt씨에게도 물어보고 하여... 다음과 같은 코드가 나왔다
import requests
from bs4 import BeautifulSoup
import json
import time
from time import sleep
from multiprocessing import Process, Pipe
def scrape_subway_info(naver_code, conn):
base_query = f"https://pts.map.naver.com/end-subway/ends/web/{naver_code}/home"
page = requests.get(base_query)
soup = BeautifulSoup(page.text, "html.parser")
try:
line_num = soup.select_one('body > div.app > div > div > div > div.place_info_box > div > div.p19g2ytg > div > button > strong.line_no').get_text()
station_nm = soup.select_one('body > div.app > div > div > div > div.place_info_box > div > div.p19g2ytg > div > button > strong.place_name').get_text()
conn.send((line_num, station_nm, naver_code))
except:
conn.send(None)
def find_code():
INFO = {}
processes = []
connections = []
for naver_code in range(100, 20000):
parent_conn, child_conn = Pipe()
p = Process(target=scrape_subway_info, args=(naver_code, child_conn))
p.start()
processes.append(p)
connections.append(parent_conn)
for i, conn in enumerate(connections):
res = conn.recv()
if res is None:
continue
line_num, station_nm, naver_code = res
if line_num not in INFO:
INFO[line_num] = []
block = {"station_nm": station_nm, "naver_code": naver_code}
INFO[line_num].append(block)
for p in processes:
p.join()
return INFO
if __name__ == "__main__":
start = time.time()
with open('subway_information.json', 'w', encoding='utf-8') as f:
json.dump(find_code(), f, ensure_ascii=False, indent=4)
end =time.time()
print(f"{end - start:.5f} sec")
너무 복잡하지만 여튼 간략히 설명하자면
1) child_conn 상에서 scrape_subway_info에서 단건의 크롤링을 진행하고
2) 그것을 send()를 통해 parent_conn으로 보내기
3) parent_conn으로 보내진 값이 none인지 확인하고, 아니라면 info에 담는
그런 코드가 완성되었다.
우선 크롤링 범위를 100, 1000까지 잡고 실행했더니... 너무나 감격스럽게 돌아갔다 🥰
하지만... 뒤이에 100, 20000까지 본격적으로 돌려봤더니 나오고 만 그 에러
OSError: [Errno 24] Too many open files
하,,, 이거 해결하려고 정말 검색을 열심히 했다.
가장 빈번하게 나온 해결방법은 limit.conf 파일 수정하는 방법이었다. (ulimit 늘리는 그 방법) 이 방법은 빠르게 해결할 수 있으나, 근본적인 해결책이 아니라고 생각했다. 실제 lambda에서 os.system("ulimit -n 2000") 방법이 안 먹히기도 해서, 이 방법은 사용할 수 없었다
너무 많은 파일을 열고 있다는 것은, 한꺼번에 너무 많은 프로세스가 실행된다는 의미라고 생각했다. 그렇다면, 한 번에 실행되는 프로세스의 수를 줄여주면 되지 않을까? 즉,
1) 멀티프로세싱을 거는 프로세스의 숫자를 제한할 수 있도록 범위를 잘 조정하고,
2) 한 번 걸어놓은 프로세스가 종료될때까지 기다린 후에 다음 범위에 대해 크롤링을 돌리는 것이다
이러한 생각을 바탕으로 아래와 같이 코드를 변경하였다
import requests
from bs4 import BeautifulSoup
import json
import time
from time import sleep
from multiprocessing import Process, Pipe
import os
INFO = {}
def scrape_subway_info(naver_code, conn):
base_query = f"https://pts.map.naver.com/end-subway/ends/web/{naver_code}/home"
page = requests.get(base_query)
soup = BeautifulSoup(page.text, "html.parser")
try:
line_num = soup.select_one('body > div.app > div > div > div > div.place_info_box > div > div.p19g2ytg > div > button > strong.line_no').get_text()
station_nm = soup.select_one('body > div.app > div > div > div > div.place_info_box > div > div.p19g2ytg > div > button > strong.place_name').get_text()
conn.send((line_num, station_nm, naver_code))
except:
conn.send(None)
def find_code(start, end):
global INFO
processes = []
connections = []
for naver_code in range(start, end):
parent_conn, child_conn = Pipe()
p = Process(target=scrape_subway_info, args=(naver_code, child_conn))
p.start()
processes.append(p)
connections.append(parent_conn)
for i, conn in enumerate(connections):
res = conn.recv()
if res is None:
continue
print(res)
line_num, station_nm, naver_code = res
if line_num not in INFO:
INFO[line_num] = []
block = {"station_nm": station_nm, "naver_code": naver_code}
INFO[line_num].append(block)
for p in processes:
p.join()
def run():
target = [(100,5000), (5001, 10000), (10001, 15000), (15001, 20000)]
for start, end in target:
find_code(start, end)
global INFO
return INFO
if __name__ == "__main__":
start = time.time()
with open('subway_information.json', 'w', encoding='utf-8') as f:
json.dump(run(), f, ensure_ascii=False, indent=4)
end =time.time()
print(f"{end - start:.5f} sec")
범위를 100~20000까지 한 번에 주는 것이 아니라, 5000개씩 끊어서 주는 것이었다. 100~5000까지 멀티프로세싱으로 실행하고, 이 범위 내에서 모든 작업이 끝날 때까지 기다린 이후에 5001~10000까지 범위를 실행하는 방식이었다.
이렇게 되니 중간에 OSError가 나지 않고 우선 로컬에서는 끝까지 잘 돌아가는 것이었다!!!!
CPU야.... 미안해...
이렇게 실행해서 lambda에 배포까지 했는데... 결과는 또 실패....
lambda 상에서는 똑같이 too many open files 에러가 떴다
흐으으음... 내가 사용하는 컴퓨터 하드웨어 스펙(m1 pro)과는 다른 환경이라 그런가 한 번에 돌릴 수 있는 파일 수가 좀 더 제한적인 것 같았다. 방법은 알았으니, 위에 코드에서 비효율적인 부분을 조금 더 고치고 범위도 수정해서 진행해보면 좋을 것 같았다. (2000 단위로 해봤는데도 too many open files가 나왔다)
번외
lambda의 성능을 높여 사용하기 위해 문서들을 정말 열심히 찾아본 것 같다. 내가 시도해본 방법은 크게
1) timeout 늘리기
혹시 너무 빠르게 lambda의 실행이 실패했고, 로그를 보니 원인이 timeout이라고 적혀있나요...? 한 번 일반 구성 - 제한 시간을 살펴보세요... 괜히 람다 함수 만지지 말고 구성을 보시는 것이 가장 정확한 해결방법일 수 있답니다...
람다는 완전 바로 사용자가 원하는 코드 동작을 실행하지 못한답니다. 람다 함수를 실행하기 위한 세팅들을 준비하는 시간이 있어야 하고, 이로 인해 몇 초 이상의 코드 딜레이가 발생하기 때문이죠 ^^ (찾아보니 이를 cold start라고 부름) 내 코드의 경우: 도커 컨테이너가 띄워지고 -> 그 컨테이너에서 python 환경 세팅 -> 깃 설치 및 클론 -> 람다 함수 실행 등으로 이루어지기 때문에 3초는 턱없이 부족한 시간. 이 시간을 최대 15분까지 늘릴 수 있으니 늘려서 사용해보도록 합시다.
2) 메모리 늘리기
timeout과 더불어 메모리도 늘릴 수 있음. 메모리를 올리면 인스턴스의 사양이 올라가 처리 속도 자체가 빨라진다고 함. 나의 경우에도 128MB에서 아예 안 돌아가던 것이 1024MB로 올리니 그래도 어느정도까지 돌아가다가 too many files open 에러를 내뱉는 수준까지는 올렸음. (3008이 최대라 거기까지 올렸지만 계속 too many files open이 나왔다...)
3) 예약된 동시성 (이건 상관없는듯)
https://docs.aws.amazon.com/ko_kr/lambda/latest/dg/configuration-concurrency.html
aws의 동시성과 관련해서 찾아보다보니 예약된 동시성 구성이라는 것이 있었다. 처음에는 동시성??? 하면서 눈이 돌아가면서 예약 가능한 갯수 한도를 790개까지 늘려버렸다. 이건 한 번에 바로 늘릴 수 있는게 아니라 aws에 직접 신청하고, 메일받고 뭐 그런 절차가 있었다. 두근두근 하면서 이걸 늘려서 실험해봤는데... 전혀 뭐가 달라지지 않았다.
천천히 다시 문서를 읽어보니, 그냥 하나의 함수에서 실행할 수 있는 동시성을 제한하는 그런 기능에 가까워보였다. (예를 들어 함수 a에서는 300개까지만 쓰게 하고, 더 중요한 함수 b에서 더 많이 자원을 쓸 수 있게 하는 고런 느낌?) 온디맨드로 새로운 인스턴스를 생성해야 해서 콜드 스타트 지연 시간 발생 방지도 안 된다고 적혀있었다... 하핳... 그래도 새로운 기능과 문서를 읽어봤으니 그것으로 만족
좀 더 찾아보니 lambda에서 함수 크기 조정, 동시성과 관련한 좋은 공식 문서가 있는 것 같아서 한 번 읽어보면 좋을 것 같다
https://docs.aws.amazon.com/ko_kr/lambda/latest/dg/lambda-concurrency.html