Map Reduce
From Big Data Analytics Lab
MapReduce: Simplified Data Processing on Large Clusters
Jeffrey Dean and Sanjay Ghemawat. MapReduce: Simplified Data Processing on Large Clusters. Sixth Symposium on Operating System Design and Implementation (OSDI '04), San Francisco, CA, December, 2004.
Introduction
- Google에는 방대한 양의 raw data를 가공해서 필요한 결과를 만들어야 하는 연산들이 많이 존재.
- input: 수집된 문서들, 웹 요청 로그 등
- output: inverted indices, 웹 문서들 간의 그래프 구조 표현, host 당 수집된 페이지 수 정보, 특정 기간동안 가장 빈번한 질의 등
- 원래의 계산 자체는 straightforward한데 데이터 분산, 병렬 계산, 실패 처리, 부하 분산 등으로 인해 문제가 복잡해짐.
- 이러한 복잡성을 숨기는 새로운 abstraction 고안.
- LISP 등 functional language의 map/reduce 함수로부터 영향.
- 대부분의 연산이 input의 각 logical "record"에 map 연산을 적용해서 중간 결과 key/value 집합을 만든 뒤, 같은 key 값의 모든 value들에 대해 reduce 연산을 적용해서 합산한 결과를 얻을 수 있겠더라.
- 병렬화도 쉽고 연산이 실패한 경우에도 map/reduce 연산을 재실행하기만 하면 됨.
- 자동으로 병렬 처리 및 대규모 연산을 분산해 주는 단순하고 강력한 인터페이스 탄생. MapReduce.
Programming Model
사용자는 하고자 하는 연산을 Map, Reduce 2가지 함수로 표현하면 됨.
- Map: input key/value pair들을 받아서 중간 key/value pair를 생성.
- MapReduce 라이브러리는 같은 key 값의 모든 중간 결과를 묶어서 Reduce 함수로 보냄.
- Reduce: 특정 중간 key 값을 갖는 value들을 input으로 받아서 이를 합하여 결과 값(들의 집합)을 생성.
- 중간 값들은 iterator를 통해 reduce 함수로 공급. 메모리에 담을 수 있는 정도의 크기로 보냄.
- Map함수의 input key, value 타입은 사용자가 프로그램에서 지정.
Example - word counting
- map/reduce 함수 이외에, input/output 파일명, 부가적인 tuning 파라미터 등 실행에 필요한 코드 작성해야 함.
Types
- map (k1, v1) -> list(k2, v2) : map의 input과 output key/value의 domain은 달라짐.
- reduce (k2, list(v2)) -> list (v2) : reduce의 input과 output key/value의 domain은 같음.
More Examples
- distributed grep
- map - input: 문서 / output: 지정한 패턴이 있는 라인
- reduce - output: 중간 결과 라인 그대로 copy
- count of URL access frequency (word count랑 비슷)
- map - input: 웹 페이지 요청 로그 / output: <URL, 1>
- reduce - output: <URL, total count>
- reverse web-link graph
- map - input: source 웹 페이지 / output: <target, source> (target은 source 내에서 찾은 link의 URL)
- reduce - output: <target, list(source)>
- term-vector per host
- term-vector: <word, frequency> pair의 리스트.
- map - input: 웹 문서 / output: <hostname, term vector> (hostname은 URL에서 추출)
- reduce - output: <hostname, term vector> 모든 term vector에서 자주 나오는 중요한 term만 남김.
- inverted index
- map - input: 문서 / output: <word, document ID>
- reduce - output: <word, list(document ID)>
- distributed sort
- map - output: <key, record> input의 각 record로부터 key를 추출.
- reduce - output: 중간 결과 그대로. reduce의 input으로 중간 결과를 보내기 전에 partitioning & 중간 결과 sort 됨.
Implementation
- 환경에 따라 MapReduce 인터페이스 구현이 달라질 수 있다.
- Google은 다음의 환경에 맞추어 구현.
- 각 머신은 2~4GB 메모리, 듀얼 프로세서 x86 프로세서를 갖는 리눅스 머신이다.
- 시중의 네트워크 하드웨어 사용 - 100Mb/s 또는 1Gb/s
- 클러스터는 수백, 수천 대의 머신으로 이루어지므로 failure는 일상적이다.
- 저장소도 비싸지 않은 IDE 디스크 사용. (파일 시스템에서 replication을 통해 availability, reliability 제공)
- 사용자는 스케쥴링 시스템에 job을 요청. 각 job은 여러 task로 나뉘고 스케쥴러가 클러스터의 여러 머신에 나누어줌
Execution Overview
- map: input은 자동으로 M개의 split으로 나뉘고, 그에 따라 병렬 호출.
- reduce: 중간 key를 파티셔닝 함수를 이용하여 R개로 중간 결과를 나누고 병렬 호출. 파티셔닝 함수와 파티션 개수(R)는 사용자가 지정.
- 사용자 프로그램에 링크된 MapReduce 라이브러리가 input 파일들을 M개의 split(보통 16~64MB)으로 나눈다. (옵션으로 조정 가능) 클러스터의 머신들에 다수의 프로그램 copy를 띄운다.
- 프로그램 copy들 중 하나가 master가 되고 나머지는 master에게 일을 할당받는 worker가 된다. master는 idle 상태의 worker에게 map task 또는 reduce task를 할당하는 역할을 한다.
- map task를 할당받은 worker는 해당하는 input split의 내용을 읽고 key/value pair를 parsing해서 사용자가 정의한 Map 함수에 넘겨준다. Map 함수의 output인 중간 결과는 메모리 버퍼에 둔다.
- 주기적으로 버퍼에 있는 중간 결과를 local disk에 쓰는데, partitioning 함수에 따라 R개의 region으로 나누어 쓴다. local disk에 쓰인 중간 결과의 위치는 마스터에게 전달된다.
- reduce worker는 master가 이 중간 결과의 위치를 알려주면, RPC를 수행하여 map worker의 local disk로부터 중간 결과를 읽는다. 모든 중간 결과를 다 읽으면 key 값에 따라 정렬하여 같은 key 값을 갖는 결과를 모은다. (한 reduce task가 여러 개의 key 값을 가진 데이터를 처리할 수도 있으므로)
- reduce worker는 정렬된 중간 결과를 각 key 값에 해당하는 중간 결과를 Reduce 함수에 넘겨준다. Reduce 함수의 output은 해당 reduce 파티션에 대한 최종 output file에 append된다.
- 모든 map task와 reduce task가 끝나면 master는 사용자 프로그램을 깨운다.
- 성공적으로 MapReduce가 수행되고 나면 R개의 output file이 남는다. (각 reduce task 당 하나)
- output이 다시 다른 MapReduce 또는 분산 어플리케이션의 input으로 사용되기도 하므로 꼭 하나로 모을 필요는 없다.
Master Data Structures
- master가 유지하는 자료 구조
- 각 map/reduce task의 상태: idle, in-progress, completed
- worker 머신의 identity
- map task의 output 중간 결과의 위치
- reduce worker에 전달됨
- master는 map task가 끝날 때 중간 결과의 위치와 크기 정보에 대한 update를 받고 in-progress 상태의 reduce task를 가진 worker에게 incremental하게 push한다.
Fault Tolerance
Worker Failure
- master가 주기적으로 모든 worker에게 ping을 날림. 일정 시간 지나도록 응답이 없으면 worker를 failed로 표시.
- 해당 worker에 의해 수행된 map task: in-progress, completed 상태인 task를 idle 상태로
되돌려서 다른 worker에게 할당.
- completed라도 다시 수행하는 이유 - map task의 중간 결과는 worker의 local disk에 쓰여 있어서 접근하지 못하므로.
- 해당 worker에 의해 수행된 reduce task: in-progress 상태인 task만 idle 상태로 되돌려서 다른
worker에게 할당.
- reduce task의 output은 최종 output file에 쓰이므로 완료된 것은 괜찮음.
- 다시 수행하기만 하면 되니까 매우 견고하다.
Master Failure
- master의 자료구조를 주기적으로 checkpoint. 죽었을 경우 ckpt부터만 다시 하면 됨.
- 실제 구현은 master가 죽으면 그냥 전체 실행 실패. 사용자가 다시 재실행.
- master는 딱 하나이기 때문에 죽을 가능성이 거의 없음. (하나 중에 하나 죽는 가능성 vs. 수백개 중에 하나 죽는 가능성)
Semantics in the Presence of Failures
- map/reduce 함수가 deterministic하면, MapReduce 프로그램의 결과는 전체 프로그램을 sequential하게 수행한 것과 결과가 같다.
- non-deterministic한 경우에도, 좀 약하긴 하지만 괜찮은 semantics를 가짐.
- 어떤 reduce task R1의 output은 프로그램을 sequential하게 수행한 어떤 R1의 output과 같다.
- 어떤 reduce task R2의 output은 프로그램을 sequential하게 수행한 어떤 R2의 output과 같다.
- 하지만, 두 sequential 수행은 서로 다르다는 거..
Locality
- 클러스터에서는 network bandwidth가 중요한 자원임.
- GFS가 input data를 여러 머신의 local disk에 복제해 놓음.
- master가 이 위치 정보를 이용해서 map task를 input data의 replica가 존재하는 머신 혹은 가까운 머신(같은 스위치 내)에 할당 시도.
Task Granularity
- 이상적으로는 map task의 개수(M)와 reduce task의 개수(R)가 훨씬 커야 좋다.
- dynamic load balancing, fast recovery 때문
- 제약
- master의 scheduling decision: O(M+R)
- master가 메모리에 유지해야 하는 자료구조 크기: O(M*R) (map output이 R개라서인듯)
- 보통 M은 locality를 위해 한 input data가 16~64MB가 되도록 정함. (GFS의 chunk 크기에
맞추겠지.) R은 worker의 몇배 정도..(output 파일의 개수이기도 함)
- e.g., M=200,000 / R=5,000 / worker = 2,000
Backup Tasks
- 이상하게 느린 머신("straggler") 때문에 마지막 몇 개의 map/reduce task가 오래 걸려서 전체 수행 시간이
길어지는 경우가 발생.
- bad disk, 머신 설정(cache가 꺼졌다던가..) 등 때문에 이런 일들이 발생할 수 있음.
- 전체 과정이 거의 끝나갈 무렵이 되면, 이미 수행 중인 task라도 backup으로 다른 worker에게 할당해서 수행시킴. 아무 worker나 먼저 끝내면 그걸로 완료시킴.
Refinements
Partitioning Function
- 사용자가 reduce task의 개수(= output file의 개수) R을 지정.
- 중간 결과의 key에 partitioning function을 사용해서 나누게 됨.
- default: hash(key) mod R, 필요에 따라 바꿔서 사용 가능.
Ordering Guarantees
- 특정 partition 내에서는 중간 key/value pair들이 오름차순으로 처리됨.
- partition별로 정렬된 output file을 생성하기 쉬움.
Combiner Function
- map task에 의해 생성된 중간 결과에는 key가 중복됨.
- reduce 함수가 commutative & associative한 경우 중간 결과를 merge해서 보낼 수 있음.
- word count example: <the, 1>, <the, 1>, <the, 1>, <the, 1> -> <the, 4>
- combiner 함수는 보통 reduce 함수와 동일. output이 최종 output file에 쓰이지 않고 중간 결과로 다시 reduce task의 input이 된다는 점이 다름.
Input and Output Types
- input type: text file에서 offset으로 key & value 지정. key/value sequence 등.
- 사용자가 reader interface를 구현하면 새로운 input type 지원됨.
- 꼭 file일 필요는 없음. DB로부터 record를 읽는다던가 메모리에 있는 자료구조에서 읽는다던가...
- output도 마찬가지...
Side-effects
- Map, reduce operator에서 aux file을 생성하는게 유용할때도 있다.
- 그런데 이 경우 programmer가 atomic하고 idempotent하게 수행되도록 잘 생각해야 한다.
- temp file을 쓰고 마지막에 rename을 한다던지...
- 여러 파일을 output 하려면 2-phase commit을 해야 되는데 그런건 제공하지 않는다.
- Q) 왜 2-phase commit을 해야되나?
Skipping Bad Records
- Map, Reduce function이 특정 record를 처리 할 때 버그로 인해 죽는 경우, 해당 record만 넘길 수 있도록 하는 실행 모드가 있다.
- signal handler를 이용해 user code가 죽으면 signal handler가 master에게 packet을 보낸다.
- master는 다음에 재 실행할때에 그 record를 건너 뛰라고 지시한다.
Local Execution
- 디버깅을 위해 local에서 실행해 볼 수 있음
Status Information
- master가 web server를 통해 status page를 제공
- 몇개의 task가 끝났나, 혹은 진행중인가.
- 중간 데이터의 크기
- 진행 속도 등
Counters
- user code에서 사용할 수 있는 counter 제공
- 각 worker들은 ping msg를 통해 master에게 counter 정보를 보내고, master가 취합해 종료후 알려준다.
- 재실행으로 인해 counter가 중복 계산 되지 않도록 master가 잘 취합한다.