Map Reduce

From Big Data Analytics Lab

MapReduce: Simplified Data Processing on Large Clusters

Jeffrey Dean and Sanjay Ghemawat. MapReduce: Simplified Data Processing on Large Clusters. Sixth Symposium on Operating System Design and Implementation (OSDI '04), San Francisco, CA, December, 2004.

Introduction

  • Google에는 방대한 양의 raw data를 가공해서 필요한 결과를 만들어야 하는 연산들이 많이 존재.
    • input: 수집된 문서들, 웹 요청 로그 등
    • output: inverted indices, 웹 문서들 간의 그래프 구조 표현, host 당 수집된 페이지 수 정보, 특정 기간동안 가장 빈번한 질의 등
  • 원래의 계산 자체는 straightforward한데 데이터 분산, 병렬 계산, 실패 처리, 부하 분산 등으로 인해 문제가 복잡해짐.
  • 이러한 복잡성을 숨기는 새로운 abstraction 고안.
    • LISP 등 functional language의 map/reduce 함수로부터 영향.
    • 대부분의 연산이 input의 각 logical "record"에 map 연산을 적용해서 중간 결과 key/value 집합을 만든 뒤, 같은 key 값의 모든 value들에 대해 reduce 연산을 적용해서 합산한 결과를 얻을 수 있겠더라.
    • 병렬화도 쉽고 연산이 실패한 경우에도 map/reduce 연산을 재실행하기만 하면 됨.
  • 자동으로 병렬 처리 및 대규모 연산을 분산해 주는 단순하고 강력한 인터페이스 탄생. MapReduce.

Programming Model

사용자는 하고자 하는 연산을 Map, Reduce 2가지 함수로 표현하면 됨.

  • Map: input key/value pair들을 받아서 중간 key/value pair를 생성.
  • MapReduce 라이브러리는 같은 key 값의 모든 중간 결과를 묶어서 Reduce 함수로 보냄.
  • Reduce: 특정 중간 key 값을 갖는 value들을 input으로 받아서 이를 합하여 결과 값(들의 집합)을 생성.
  • 중간 값들은 iterator를 통해 reduce 함수로 공급. 메모리에 담을 수 있는 정도의 크기로 보냄.
  • Map함수의 input key, value 타입은 사용자가 프로그램에서 지정.

Example - word counting

Example - word counting

  • map/reduce 함수 이외에, input/output 파일명, 부가적인 tuning 파라미터 등 실행에 필요한 코드 작성해야 함.

Types

  • map (k1, v1) -> list(k2, v2)  : map의 input과 output key/value의 domain은 달라짐.
  • reduce (k2, list(v2)) -> list (v2)  : reduce의 input과 output key/value의 domain은 같음.

More Examples

  • distributed grep
    • map - input: 문서 / output: 지정한 패턴이 있는 라인
    • reduce - output: 중간 결과 라인 그대로 copy
  • count of URL access frequency (word count랑 비슷)
    • map - input: 웹 페이지 요청 로그 / output: <URL, 1>
    • reduce - output: <URL, total count>
  • reverse web-link graph
    • map - input: source 웹 페이지 / output: <target, source> (target은 source 내에서 찾은 link의 URL)
    • reduce - output: <target, list(source)>
  • term-vector per host
    • term-vector: <word, frequency> pair의 리스트.
    • map - input: 웹 문서 / output: <hostname, term vector> (hostname은 URL에서 추출)
    • reduce - output: <hostname, term vector> 모든 term vector에서 자주 나오는 중요한 term만 남김.
  • inverted index
    • map - input: 문서 / output: <word, document ID>
    • reduce - output: <word, list(document ID)>
  • distributed sort
    • map - output: <key, record> input의 각 record로부터 key를 추출.
    • reduce - output: 중간 결과 그대로. reduce의 input으로 중간 결과를 보내기 전에 partitioning & 중간 결과 sort 됨.

Implementation

  • 환경에 따라 MapReduce 인터페이스 구현이 달라질 수 있다.
  • Google은 다음의 환경에 맞추어 구현.
  1. 각 머신은 2~4GB 메모리, 듀얼 프로세서 x86 프로세서를 갖는 리눅스 머신이다.
  2. 시중의 네트워크 하드웨어 사용 - 100Mb/s 또는 1Gb/s
  3. 클러스터는 수백, 수천 대의 머신으로 이루어지므로 failure는 일상적이다.
  4. 저장소도 비싸지 않은 IDE 디스크 사용. (파일 시스템에서 replication을 통해 availability, reliability 제공)
  5. 사용자는 스케쥴링 시스템에 job을 요청. 각 job은 여러 task로 나뉘고 스케쥴러가 클러스터의 여러 머신에 나누어줌

Execution Overview

  • map: input은 자동으로 M개의 split으로 나뉘고, 그에 따라 병렬 호출.
  • reduce: 중간 key를 파티셔닝 함수를 이용하여 R개로 중간 결과를 나누고 병렬 호출. 파티셔닝 함수와 파티션 개수(R)는 사용자가 지정.

Execution Overview

  1. 사용자 프로그램에 링크된 MapReduce 라이브러리가 input 파일들을 M개의 split(보통 16~64MB)으로 나눈다. (옵션으로 조정 가능) 클러스터의 머신들에 다수의 프로그램 copy를 띄운다.
  2. 프로그램 copy들 중 하나가 master가 되고 나머지는 master에게 일을 할당받는 worker가 된다. master는 idle 상태의 worker에게 map task 또는 reduce task를 할당하는 역할을 한다.
  3. map task를 할당받은 worker는 해당하는 input split의 내용을 읽고 key/value pair를 parsing해서 사용자가 정의한 Map 함수에 넘겨준다. Map 함수의 output인 중간 결과는 메모리 버퍼에 둔다.
  4. 주기적으로 버퍼에 있는 중간 결과를 local disk에 쓰는데, partitioning 함수에 따라 R개의 region으로 나누어 쓴다. local disk에 쓰인 중간 결과의 위치는 마스터에게 전달된다.
  5. reduce worker는 master가 이 중간 결과의 위치를 알려주면, RPC를 수행하여 map worker의 local disk로부터 중간 결과를 읽는다. 모든 중간 결과를 다 읽으면 key 값에 따라 정렬하여 같은 key 값을 갖는 결과를 모은다. (한 reduce task가 여러 개의 key 값을 가진 데이터를 처리할 수도 있으므로)
  6. reduce worker는 정렬된 중간 결과를 각 key 값에 해당하는 중간 결과를 Reduce 함수에 넘겨준다. Reduce 함수의 output은 해당 reduce 파티션에 대한 최종 output file에 append된다.
  7. 모든 map task와 reduce task가 끝나면 master는 사용자 프로그램을 깨운다.
  • 성공적으로 MapReduce가 수행되고 나면 R개의 output file이 남는다. (각 reduce task 당 하나)
  • output이 다시 다른 MapReduce 또는 분산 어플리케이션의 input으로 사용되기도 하므로 꼭 하나로 모을 필요는 없다.

Master Data Structures

  • master가 유지하는 자료 구조
    • 각 map/reduce task의 상태: idle, in-progress, completed
    • worker 머신의 identity
    • map task의 output 중간 결과의 위치
      • reduce worker에 전달됨
      • master는 map task가 끝날 때 중간 결과의 위치와 크기 정보에 대한 update를 받고 in-progress 상태의 reduce task를 가진 worker에게 incremental하게 push한다.

Fault Tolerance

Worker Failure

  • master가 주기적으로 모든 worker에게 ping을 날림. 일정 시간 지나도록 응답이 없으면 worker를 failed로 표시.
  • 해당 worker에 의해 수행된 map task: in-progress, completed 상태인 task를 idle 상태로 되돌려서 다른 worker에게 할당.
    • completed라도 다시 수행하는 이유 - map task의 중간 결과는 worker의 local disk에 쓰여 있어서 접근하지 못하므로.
  • 해당 worker에 의해 수행된 reduce task: in-progress 상태인 task만 idle 상태로 되돌려서 다른 worker에게 할당.
    • reduce task의 output은 최종 output file에 쓰이므로 완료된 것은 괜찮음.
  • 다시 수행하기만 하면 되니까 매우 견고하다.

Master Failure

  • master의 자료구조를 주기적으로 checkpoint. 죽었을 경우 ckpt부터만 다시 하면 됨.
  • 실제 구현은 master가 죽으면 그냥 전체 실행 실패. 사용자가 다시 재실행.
    • master는 딱 하나이기 때문에 죽을 가능성이 거의 없음. (하나 중에 하나 죽는 가능성 vs. 수백개 중에 하나 죽는 가능성)

Semantics in the Presence of Failures

  • map/reduce 함수가 deterministic하면, MapReduce 프로그램의 결과는 전체 프로그램을 sequential하게 수행한 것과 결과가 같다.
  • non-deterministic한 경우에도, 좀 약하긴 하지만 괜찮은 semantics를 가짐.
    • 어떤 reduce task R1의 output은 프로그램을 sequential하게 수행한 어떤 R1의 output과 같다.
    • 어떤 reduce task R2의 output은 프로그램을 sequential하게 수행한 어떤 R2의 output과 같다.
    • 하지만, 두 sequential 수행은 서로 다르다는 거..

Locality

  • 클러스터에서는 network bandwidth가 중요한 자원임.
  • GFS가 input data를 여러 머신의 local disk에 복제해 놓음.
  • master가 이 위치 정보를 이용해서 map task를 input data의 replica가 존재하는 머신 혹은 가까운 머신(같은 스위치 내)에 할당 시도.

Task Granularity

  • 이상적으로는 map task의 개수(M)와 reduce task의 개수(R)가 훨씬 커야 좋다.
    • dynamic load balancing, fast recovery 때문
  • 제약
    • master의 scheduling decision: O(M+R)
    • master가 메모리에 유지해야 하는 자료구조 크기: O(M*R) (map output이 R개라서인듯)
  • 보통 M은 locality를 위해 한 input data가 16~64MB가 되도록 정함. (GFS의 chunk 크기에 맞추겠지.) R은 worker의 몇배 정도..(output 파일의 개수이기도 함)
    • e.g., M=200,000 / R=5,000 / worker = 2,000

Backup Tasks

  • 이상하게 느린 머신("straggler") 때문에 마지막 몇 개의 map/reduce task가 오래 걸려서 전체 수행 시간이 길어지는 경우가 발생.
    • bad disk, 머신 설정(cache가 꺼졌다던가..) 등 때문에 이런 일들이 발생할 수 있음.
  • 전체 과정이 거의 끝나갈 무렵이 되면, 이미 수행 중인 task라도 backup으로 다른 worker에게 할당해서 수행시킴. 아무 worker나 먼저 끝내면 그걸로 완료시킴.

Refinements

Partitioning Function

  • 사용자가 reduce task의 개수(= output file의 개수) R을 지정.
  • 중간 결과의 key에 partitioning function을 사용해서 나누게 됨.
  • default: hash(key) mod R, 필요에 따라 바꿔서 사용 가능.

Ordering Guarantees

  • 특정 partition 내에서는 중간 key/value pair들이 오름차순으로 처리됨.
  • partition별로 정렬된 output file을 생성하기 쉬움.

Combiner Function

  • map task에 의해 생성된 중간 결과에는 key가 중복됨.
  • reduce 함수가 commutative & associative한 경우 중간 결과를 merge해서 보낼 수 있음.
    • word count example: <the, 1>, <the, 1>, <the, 1>, <the, 1> -> <the, 4>
  • combiner 함수는 보통 reduce 함수와 동일. output이 최종 output file에 쓰이지 않고 중간 결과로 다시 reduce task의 input이 된다는 점이 다름.

Input and Output Types

  • input type: text file에서 offset으로 key & value 지정. key/value sequence 등.
  • 사용자가 reader interface를 구현하면 새로운 input type 지원됨.
  • 꼭 file일 필요는 없음. DB로부터 record를 읽는다던가 메모리에 있는 자료구조에서 읽는다던가...
  • output도 마찬가지...

Side-effects

  • Map, reduce operator에서 aux file을 생성하는게 유용할때도 있다.
  • 그런데 이 경우 programmer가 atomic하고 idempotent하게 수행되도록 잘 생각해야 한다.
    • temp file을 쓰고 마지막에 rename을 한다던지...
  • 여러 파일을 output 하려면 2-phase commit을 해야 되는데 그런건 제공하지 않는다.
    • Q) 왜 2-phase commit을 해야되나?

Skipping Bad Records

  • Map, Reduce function이 특정 record를 처리 할 때 버그로 인해 죽는 경우, 해당 record만 넘길 수 있도록 하는 실행 모드가 있다.
  • signal handler를 이용해 user code가 죽으면 signal handler가 master에게 packet을 보낸다.
  • master는 다음에 재 실행할때에 그 record를 건너 뛰라고 지시한다.

Local Execution

  • 디버깅을 위해 local에서 실행해 볼 수 있음

Status Information

  • master가 web server를 통해 status page를 제공
    • 몇개의 task가 끝났나, 혹은 진행중인가.
    • 중간 데이터의 크기
    • 진행 속도 등

Counters

  • user code에서 사용할 수 있는 counter 제공
  • 각 worker들은 ping msg를 통해 master에게 counter 정보를 보내고, master가 취합해 종료후 알려준다.
    • 재실행으로 인해 counter가 중복 계산 되지 않도록 master가 잘 취합한다.