Deprecated: The each() function is deprecated. This message will be suppressed on further calls in /home/zhenxiangba/zhenxiangba.com/public_html/phproxy-improved-master/index.php on line 456
{-# OPTIONS -fscoped-type-variables #-}
{-
Google's MapReduce programming model revisited
(C) Ralf Laemmel, 2006
We again define the MapReduce abstraction.
This time we take some distribution aspects into account.
We use explicit quantification for all libary functions.
This helps us not to get lost regarding all the different "maps".
We also enable lexically scoped type variables for convenience.
-}
module Step8 where
import Prelude hiding (map,foldl,fst,snd,concat)
import qualified Prelude (map,foldl,fst,snd,concat)
import qualified Data.List -- A library for advanced list-processing
import qualified Data.Map -- A library type for dictionaries
type Dict k v = Data.Map.Map k v -- An alias for Haskell's dictionary type
mapReduce :: forall k1 k2 v1 v2 v3 v4. (Ord k1, Ord k2)
=> (v1 -> Int) -- Size of input values
-> Int -- Split size for map tasks
-> Int -- Number of partitions
-> (k2 -> Int) -- Partitioning for keys
-> (k1 -> v1 -> [(k2,v2)]) -- The map function
-> (k2 -> [v2] -> Maybe v3) -- The combiner function
-> (k2 -> [v3] -> Maybe v4) -- The reduce function
-> Dict k1 v1 -- Input data
-> Dict k2 v4 -- Output data
mapReduce size split parts keycode map combiner reduce =
concatReducts -- 9. Concatenate results
. Prelude.map (
reducePerKey -- 8. Apply reduce to each partition
. groupByKey ) -- 7. Group intermediates per key
. mergeParts -- 6. Merge scattered partitions
. Prelude.map (
Prelude.map (
combinePerKey -- 5. Apply combiner locally
. groupByKey ) -- 4. Group local intermediate data
. partition -- 3. Partition local intermediate data
. mapPerKey ) -- 2. Apply map locally to each piece
. splitInput -- 1. Split up input data into pieces
where
splitInput :: Dict k1 v1 -> [Dict k1 v1]
splitInput =
Prelude.map Data.Map.fromList -- 4. Turn list of pairs into dictionary
. Prelude.fst -- 3. Project away size of last piece
. Prelude.foldl splitHelper ([[]],0) -- 2. Splitting as a list fold
. Data.Map.toList -- 1. Access dictionary as list of pairs
where
splitHelper :: ([[(k1,v1)]],Int) -- Pieces so far with size of head
-> (k1,v1) -- The key/value pair to be considered
-> ([[(k1,v1)]],Int) -- New set of pieces and size of head
splitHelper (ps,s) x@(k1,v1) =
if size v1 + s < split || Prelude.null (Prelude.head ps)
then (((x:Prelude.head ps):Prelude.tail ps), size v1 + s)
else ([x]:ps,size v1)
mapPerKey :: Dict k1 v1 -> [(k2,v2)]
mapPerKey = Prelude.concat
. Prelude.map (uncurry map)
. Data.Map.toList
partition :: [(k2,v2)] -> [[(k2,v2)]]
partition task = Prelude.map partitionHelper [1..parts]
where
partitionHelper :: Int -> [(k2,v2)]
partitionHelper p = Prelude.filter
( (==) p
. keycode
. Prelude.fst) task
groupByKey :: [(k2,v2or3)] -> Dict k2 [v2or3]
groupByKey = Prelude.foldl
groupHelper
Data.Map.empty
where
groupHelper :: Dict k2 [v2or3] -> (k2,v2or3) -> Dict k2 [v2or3]
groupHelper m (k2,v2) = Data.Map.insertWith (++) k2 [v2] m
combinePerKey :: Dict k2 [v2] -> Dict k2 v3
combinePerKey = Data.Map.mapWithKey h -- 3. Eliminate Justs
. Data.Map.filterWithKey g -- 2. Filter non-Nothings
. Data.Map.mapWithKey combiner -- 1. Apply combiner per key
where
g k2 Nothing = False
g k2 (Just _) = True
h k2 (Just x) = x
mergeParts :: [[Dict k2 v3]] -> [[(k2,v3)]]
mergeParts =
Prelude.map ( Prelude.concat -- 3. Unite partition
. Prelude.map Data.Map.toList) -- 2. Access dictionaries
. Data.List.transpose -- 1. Transpose grouping
reducePerKey :: Dict k2 [v3] -> Dict k2 v4
reducePerKey = Data.Map.mapWithKey h -- 3. Eliminate Maybe
. Data.Map.filterWithKey g -- 2. Filter Justs
. Data.Map.mapWithKey reduce -- 1. Apply reduce per key
where
g k2 Nothing = False
g k2 (Just _) = True
h k2 (Just x) = x
concatReducts :: [Dict k2 v4] -> Dict k2 v4
concatReducts = Data.Map.fromList
. Prelude.concat
. Prelude.map Data.Map.toList
-- The encoding of word-occurrence counting
wordOccurrenceCount
= mapReduce
(const 1)
3
7
myHash
myMap
myReduce
myReduce
where
myHash key = ((Prelude.fromEnum (Prelude.head key)) `mod` 7) + 1
myMap = const (Prelude.map (flip (,) 1) . words)
myReduce = const (Just . sum)
wordOccurrenceCount'
= mapReduce
(const 1)
3
7
myHash
myMap
myCombiner
myReduce
where
myHash key = ((Prelude.fromEnum (Prelude.head key)) `mod` 7) + 1
myMap = const (Prelude.map (flip (,) 1) . words)
myCombiner = const Just
myReduce = const (Just . sum . Prelude.concat)
-- Test harness
main = do
let x = wordOccurrenceCount input
let y = wordOccurrenceCount' input
print $ x
print $ y
print $ x == y
where
input =
Data.Map.insert "a" "1 3 2"
$ Data.Map.insert "b" "3 2 3 4 4 4 4"
$ Data.Map.insert "c" "5 7 5 5 5 6 6 6 6 6 6"
$ Data.Map.insert "d" "7 5 7 7 7 7 7 8 8 8 9 8 8 8 8"
$ Data.Map.insert "e" "9 8 9 9 9 9 9 9 9 10 10 10 10 11 10 10 10 10 10"
$ Data.Map.insert "f" "11 11 11 11 11 11 11 11 11 11 12"
$ Data.Map.insert "g" "12 12 12 12 12 12 12 12 12 12 13 12"
$ Data.Map.insert "h" "13 13 13 13 13 13 13 13 13 13 10 13 13"
$ Data.Map.empty