Примеры использования Hadoop без программирования на Java

THE HOLY BIBLE - King James Version - БИБЛИЯ в Синодальном переводе
"Нас Атакуют!" Изобличи козни лукавого, запрети диаволу

Примеры использования Hadoop без программирования на Java

Технология "Больших Данных" (Big Data) вошла в моду и используется для обработки больших объёмов неструктурированных данных, подготовки, предварительной проверки и сортировки записей перед загрузкой в хранилища данных и во многих совершенно новых областях знаний. В то же время, подготовка программ для распределённых вычислений по методике MapReduce требует уверенного знания Java. И хотя имеющиеся в арсенале hadoop средства позволяют решать большинство задач без необходимости программирования, достоинства таких средств и даже само существование их являются сюрпризом для многих пользователей системы.

Предлагаемая читателю заметка представляет некоторые простые случаи использования hadoop для решения прикладных задач без необходимости программирования, что делает систему привлекательной для простых пользователей - непрограммистов.

Прежде чем мы продолжим, я хотел бы привести строки из Евангелия:



............... == Соборное послание святого апостола Иуды == ..................
=== Глава 1, Стих 25 ===
24 Могущему же соблюсти вас от падения и поставить пред славою Своею
 непорочными в радости,
25 Единому Премудрому Богу, Спасителю нашему чрез Иисуса Христа Господа
 нашего, слава и величие, сила и власть прежде всех веков, ныне и во все
 веки. Аминь.

Лично для вас благая весть - Единородный Сын Божий Иисус Христос любит вас, Он взошёл на крест за ваши грехи, был распят и на третий день воскрес, сел одесную Бога и открыл нам дорогу в Царствие Небесное.

Святая Троица - Отец, Сын и Дух Святой - оберегает нас в нашей жизни, милосердно ожидая нашего покаяния в тяжких грехах, совершённых нами в прошлом. Кто-то, незнакомый с Иисусом Христом, может считать что он не сделал ничего зазорного и не нуждается в прощении. Как жестоко ошибается такой человек, какую незавидную участь он готовит для себя, буквально сам лишая себя Божией милости!

Каждый день, явно или неявно, нас искушает диавол, стремясь повесить ярмо нам на шеи, согнув под свою власть. И только Господь может соблюсти нас от падения, защитить и оправдать нас, поставив непорочными в радости, по Его безграничной милости к творениям рук Его. Но прежде всего каждый из нас должен задуматься о своей жизни, осознать как много грешил он в прошлом. Раскайся в грехах своих, признай славу и величие Господа - и только тогда Его сила и власть спасут и оправдают тебя. Твоя же обязанность - трезвиться и не допускать новых падений.

Покайтесь, примите Иисуса как вашего Спасителя, ибо наступают последние времена и время близко - стоит Судья у ворот.

Пожалуйста, в своих каждодневных трудах, какими бы занятыми вы себе ни казались - находите время для Бога, Его заповедей и Библии.

На главной странице этого сайта вы найдете программу для чтения Библии в командной строке - буду очень рад если программа окажется полезной. Пожалуйста, читайте Библию, на экране или в печатном виде - вы будете искренне удивлены как много там сказано лично про вас и ваши обстоятельства.


Вернёмся к нашим техническим деталям.

Итак, определим действительно ли hadoop так сложен для нас, как принято считать? Сама инфраструктура, без сомнения, представляет из себя очень современную сложную и высоко технологичную систему. Это становится очевидным после беглого взгляда на предложения Big Data от "больших компаний с мировым именем" - за исключением одной, все остальные просто-напросто копируют тот самый hadoop, навешивая на него дополнительные, якобы очень полезные и революционные, возможности (и ценник с впечатляющим количеством нулей). Ни один из "корпоративных бегемотов" не решается разработать собственную реализацию, очевидно опасаясь позорного фиаско. Но что же делать пользователю системы?

Пользователями MapReduce framework могут быть разные категории ИТ профессионалов - разработчики, администраторы - или обычные бизнес-пользователи. Для каждой категории людей hadoop имеет свои средства. Разработчики, используя Java, могут воздействовать на функциональность системы в полной мере и настолько глубоко, насколько необходимо. Прочие могут использовать системы типа Pig для "прозрачного" создания и выполнения задач MapReduce. Но самым простым, гибким и очень эффективным средством работы с hadoop является так называемый "streaming".

Идея проста - если пользователь в состоянии написать скрипт или программу, принимающую данные со стандартного ввода, обрабатывающую их надлежащим образом и выдающую результат на стандартный вывод - такой скрипт может быть "подключён" в самое сердце hadoop и будет выполнять те самые высокоэффективные параллельные вычисления на каждом из узлов кластера. Да, именно так - просто и эффективно. Никакого знания Java или MapReduce API нам не потребуется.

Тестирование будет производиться на уже имеющемся кластере. Основная идея - предварительная выборка и сортировка данных для последующей загрузки в базу данных. Для начала мы проведём простенький тест, для оценки работоспособности нашего кластера и написанных скриптов. Затем те же самые скрипты будут использованы для сортировки около 2Тб логов вебсервера. Такой подход позволит нам сэкономить ресурсы нашего сервера баз данных и ускорить последующую обработку.

Необходимые средства

Для выполнения поставленной задачи быстро и легко нам понадобится серьёзное оборудование, уже установленное и налаженное в нашей серверной комнате. Если у вас нет доступа к hadoop кластеру - всё равно существуют варианты. Я бы порекомендовал провести тестирование работоспособности скриптов на виртуальной машине, предлагаемой компанией Cloudera. При наличии средств время на hadoop кластере может быть куплено у одного из множества провайдеров в Интернете. Либо, если имеется необходимое количество компьютеров, вы можете построить свой кластер используя бесплатный продукт той же самой Cloudera. Я настоятельно не рекомендую пробовать установить кластер из оригинальной версии с сайта Apache.

Опыт программирования на Java нам не понадобится.

План действий

Мы должны выполнить определённую последовательность действий:

  • Подготовить исходные данные;
  • Переместить готовые данные на HDFS - файловую систему кластера hadoop;
  • Написать необходимые скрипты (известные как "mapper" и "reducer");
  • Выполнить основную задачу hadoop, которая произведёт набор отсортированных файлов;
  • Произвести вторичную сортировку - объединение файлов результата;
  • Извлечь конечный результат из HDFS.

    Отсортированный конечный файл может быть помещён на файловую систему, с которой наша база данных может загружать данные. В случае использования Оракл наиболее эффективное средство - внешние таблицы.

    Начальная проверка функциональности

    Как было сказано, прежде чем мы начнём сортировать терабайты данных, что может занять часы, нам необходимо провести простой функциональный тест. Я подготовил 4 маленьких тестовых файла, содержащих неотсортированные записи на английском и русском языках - мы последовательно выполним вышеописанные действия над этими файлами. Основная задача - убедиться что наши "mapper" и "reducer" понятны инфраструктуре hadoop.

    Исходные данные

    Эти файлы были созданы и хранятся на локальной файловой системе одного из узлов моего hadoop кластера. Все действия будут выполняться непосредственно на узлах кластера.

    file1.txt
    ----------
    высокая
    производительность
    и
    минимальное
    время
    отклика
    для
    hadoop
    Рекомендуем
    использовать
    10
    Gbps
    Ethernet
    или
    Infiniband
    для
    HDFS
    
    file2.txt
    ----------
    Обычно
    конечный
    набор
    данных
    будет
    содержать
    единственный
    текстовый
    файл
    который
    может
    быть
    помещён
    
    file3.txt
    ---------
    hadoop
    cluster
    работает
    и
    в
    состоянии
    понять
    наши
    mappers
    и
    reducers
    
    file4.txt
    ----------
    Жили-были
    два
    кота
    восемь
    лапок
    два
    хвоста
    Если
    знаешь
    this
    стишок
    тo
    и
    знаешь
    who
    Маршак
    is
    

    Проверим наличие этих файлов на локальной файловой системе:

    .............. == Соборное послание святого апостола Иакова == .................
    === Глава 3, Стих 4 ===
    4 Вот, и корабли,  как  ни  велики  они  и  как  ни  сильными  ветрами  носятся,
    небольшим рулем направляются, куда хочет кормчий;
    5 так и язык -- небольшой член,  но  много  делает.  Посмотри,  небольшой  огонь
    как много вещества зажигает!
    6 И язык -- огонь, прикраса неправды; язык в  таком  положении  находится  между
    членами нашими, что оскверняет все тело и воспаляет  круг  жизни,  будучи  сам
    воспаляем от геенны.
    
    (b+/b-, c+/c-, +/-, *) >
    [oracle@hadoop]$ pwd
    /home/oracle/htest
    [oracle@hadoop]$ ls -latrh
    total 28K
    -rw-rw-r--  1 oracle oracle   73 Jul 17 20:22 file1.txt
    -rw-rw-r--  1 oracle oracle   79 Jul 17 20:22 file2.txt
    -rw-rw-r--  1 oracle oracle   70 Jul 17 20:22 file3.txt
    -rw-rw-r--  1 oracle oracle   56 Jul 17 20:22 file4.txt
    drwx------ 43 oracle oracle 4.0K Jul 17 20:22 ..
    drwxrwxr-x  2 oracle oracle 4.0K Jul 17 20:22 .
    

    Перемещение готовых данных на HDFS

    Копировать данные на HDFS просто - мы используем сам hadoop в качестве утилиты командной строки. В конце концов, ведь именно слоны перемещают тяжести, правда?

    ............. == Послание к Римлянам святого апостола Павла == .................
    === Глава 10, Стих 10 ===
    9 Ибо если устами твоими будешь исповедывать Иисуса  Господом  и  сердцем  твоим
    веровать, что Бог воскресил Его из мертвых, то спасешься,
    10 потому что сердцем веруют к праведности, а  устами  исповедуют  ко  спасению.
    
    (b+/b-, c+/c-, +/-, *) >
    
    [oracle@hadoop]$ hadoop fs -ls /user/oracle
    Found 3 items
    drwx------   - oracle hadoop          0 2012-07-05 05:42 /user/oracle/.staging
    drwxr-xr-x   - oracle hadoop          0 2012-06-17 18:06 /user/oracle/logs
    drwxrwxrwx   - oracle hadoop          0 2012-06-25 12:45 /user/oracle/olhcache
    [oracle@hadoop]$ hadoop fs -mkdir /user/oracle/data
    [oracle@hadoop]$ hadoop fs -ls /user/oracle
    Found 4 items
    drwx------   - oracle hadoop          0 2012-07-05 05:42 /user/oracle/.staging
    drwxr-xr-x   - oracle hadoop          0 2012-07-17 20:28 /user/oracle/data
    drwxr-xr-x   - oracle hadoop          0 2012-06-17 18:06 /user/oracle/logs
    drwxrwxrwx   - oracle hadoop          0 2012-06-25 12:45 /user/oracle/olhcache
    [oracle@hadoop]$
    [oracle@hadoop]$ hadoop fs -copyFromLocal *.txt /user/oracle/data/
    [oracle@hadoop]$ hadoop fs -ls /user/oracle/data
    Found 4 items
    -rw-r--r--   3 oracle hadoop         73 2012-07-17 20:29 /user/oracle/data/file1.txt
    -rw-r--r--   3 oracle hadoop         79 2012-07-17 20:29 /user/oracle/data/file2.txt
    -rw-r--r--   3 oracle hadoop         70 2012-07-17 20:29 /user/oracle/data/file3.txt
    -rw-r--r--   3 oracle hadoop         56 2012-07-17 20:29 /user/oracle/data/file4.txt
    [oracle@hadoop]$ hadoop fs -tail /user/oracle/data/file4.txt
    Жили-были
    два
    кота
    восемь
    лапок
    два
    хвоста
    Если
    знаешь
    this
    стишок
    тo
    и
    знаешь
    who
    Маршак
    is
    [oracle@hadoop]$
    

    Итак, все наши файлы уже видны всем компонентам инфраструктуры hadoop и могут быть использованы всеми узлами кластера. Слонёнок справился со своим первым заданием на отлично!

    Написание "mapper" и "reducer" скриптов

    Что радует - эти скрипты будут очень простые и универсальные, мы сможем использовать их как шаблон для последующих задач. Код скрипта выглядит так:

    hadoop fs -rmr /user/oracle/result
    hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u4.jar \
    > -input /user/oracle/data \
    > -output /user/oracle/result \
    > -mapper /bin/cat
    

    Мы используем одну из возможностей hadoop - упомянутый в начале заметки "streaming", но с Java библиотекой, переработанной для нас компанией Cloudera. Вы можете также попробовать использовать JAR файл, предоставляемый самим проектом Apache Hadoop.

    Но посмотрите на "mapper" - это не совпадение, мы используем стандартную команду линукса "cat". Эта программа просто копирует ввод на вывод. Зачем? Давайте запустим наш hadoop скрипт и посмотрим. Кстати, в нашем первом скрипте мы не используем "reducer".

    Выполнение основной задачи hadoop

    И снова мы поручим всю тяжелую работу нашему слонёнку и просто запустим скрипт:

    ................... == От Луки святое благовествование == ......................
    === Глава 17, Стих 21 ===
    20 Быв же спрошен  фарисеями,  когда  придет  Царствие  Божие,  отвечал  им:  не
    придет Царствие Божие приметным образом,
    21 и не скажут:  вот,  оно  здесь,  или:  вот,  там.  Ибо  вот,  Царствие  Божие
    внутрь вас есть.
    
    (b+/b-, c+/c-, +/-, *) >
    
    [oracle@hadoop]$ hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u4.jar \
    -input /user/oracle/data -output /user/oracle/result -mapper /bin/cat
    packageJobJar: [/tmp/hadoop-oracle/hadoop-unjar7954453134228920110/] []
    /tmp/streamjob2262620489771001950.jar tmpDir=null
    12/07/17 20:41:47 WARN snappy.LoadSnappy: Snappy native library is available
    12/07/17 20:41:47 INFO util.NativeCodeLoader: Loaded the native-hadoop library
    12/07/17 20:41:47 INFO snappy.LoadSnappy: Snappy native library loaded
    12/07/17 20:41:47 INFO mapred.FileInputFormat: Total input paths to process : 4
    12/07/17 20:41:47 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-oracle/mapred/local]
    12/07/17 20:41:47 INFO streaming.StreamJob: Running job: job_201207171943_0004
    12/07/17 20:41:47 INFO streaming.StreamJob: To kill this job, run:
    12/07/17 20:41:47 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job
    -Dmapred.job.tracker=localhost.localdomain:8021 -kill job_201207171943_0004
    12/07/17 20:41:47 INFO streaming.StreamJob: Tracking URL:
    http://localhost.localdomain:50030/jobdetails.jsp?jobid=job_201207171943_0004
    12/07/17 20:41:48 INFO streaming.StreamJob:  map 0%  reduce 0%
    12/07/17 20:41:57 INFO streaming.StreamJob:  map 50%  reduce 0%
    12/07/17 20:42:03 INFO streaming.StreamJob:  map 100%  reduce 0%
    12/07/17 20:42:09 INFO streaming.StreamJob:  map 100%  reduce 33%
    12/07/17 20:42:11 INFO streaming.StreamJob:  map 100%  reduce 100%
    12/07/17 20:42:14 INFO streaming.StreamJob: Job complete: job_201207171943_0004
    12/07/17 20:42:14 INFO streaming.StreamJob: Output: /user/oracle/result
    

    Заметьте - hadoop нашёл и прочитал все входные файлы, о чём свидетельствует запись "Total input paths to process : 4". Сообщение "Job complete" приглашает нас взглянуть на результат работы hadoop скрипта:

    ..................... == Первая книга Паралипоменон == .........................
    === Глава 10, Стих 14 ===
    13 Так умер Саул за свое беззаконие, которое он сделал  пред  Господом,  за  то,
    что не соблюл слова Господня и обратился к волшебнице с вопросом,
    14 а не взыскал Господа.  _За_  _то_  Он  и  умертвил  его,  и  передал  царство
    Давиду, сыну Иессееву.
    
    (b+/b-, c+/c-, +/-, *) >
    
    oracle@hadoop]$ hadoop fs -ls /user/oracle/result
    Found 3 items
    -rw-r--r--   3 oracle hadoop          0 2012-07-17 20:42 /user/oracle/result/_SUCCESS
    drwxrwxrwx   - oracle hadoop          0 2012-07-17 20:41 /user/oracle/result/_logs
    -rw-r--r--   3 oracle hadoop        323 2012-07-17 20:42 /user/oracle/result/part-00000
    

    Нас интересуют файлы с именем "part-*", из их имён понятно на сколько фрагментов был разбит выходной файл. Что же мы ожидаем увидеть на выходе? Спецификация hadoop требует сортировки данных после этапа "map", таким образом, даже при отсутствии "reducer", мы надеемся найти в выходном файле правильно отсортированные строки из всех четырёх входных файлов.

    [oracle@hadoop]$ hadoop fs -cat /user/oracle/result/part-00000
    10
    Ethernet
    Gbps
    HDFS
    Infiniband
    cluster
    hadoop
    hadoop
    is
    mappers
    reducers
    this
    who
    Если
    Жили-были
    Маршак
    Обычно
    Рекомендуем
    будет
    быть
    в
    восемь
    время
    высокая
    данных
    два
    два
    для
    для
    единственный
    знаешь
    знаешь
    и
    и
    и
    и
    или
    использовать
    конечный
    кота
    который
    лапок
    минимальное
    может
    набор
    наши
    отклика
    помещён
    понять
    производительность
    работает
    содержать
    состоянии
    стишок
    тo
    текстовый
    файл
    хвоста
    [oracle@hadoop]$
    

    Порядок слов выглядит правильным, но всегда лучше проверить дважды. Я перенаправлю вывод на вход стандартной утилиты линукс "sort":

    [oracle@hadoop]$ hadoop fs -cat /user/oracle/result/part-00000 | sort
    10
    cluster
    Ethernet
    Gbps
    hadoop
    hadoop
    HDFS
    Infiniband
    is
    mappers
    reducers
    this
    who
    будет
    быть
    в
    восемь
    время
    высокая
    данных
    два
    два
    для
    для
    единственный
    Если
    Жили-были
    знаешь
    знаешь
    и
    и
    и
    и
    или
    использовать
    конечный
    кота
    который
    лапок
    Маршак
    минимальное
    может
    набор
    наши
    Обычно
    отклика
    помещён
    понять
    производительность
    работает
    Рекомендуем
    содержать
    состоянии
    стишок
    тo
    текстовый
    файл
    хвоста
    [oracle@hadoop]$
    

    Этот второй вариант сортировки мне нравится больше первого! Могу ли я заставить hadoop воспроизвести такой же результат? Да, и именно для этого нам понадобится "reducer" - утилита "sort". В этом случае вывод всех "mapper"ов (cat) будет перенаправлен на вход "reducer"а (sort), производя желаемый результат.

    Повторный запуск скрипта с "reducer"

    Изменим наш скрипт-шаблон и запустим его вновь. Но прежде всего не забудьте скопировать с HDFS на локальный диск ваши прошлые результаты - скрипт полностью удалит и создаст заново директорию "result".

    hadoop fs -rmr /user/oracle/result
    hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u4.jar \
    > -input /user/oracle/data \
    > -output /user/oracle/result \
    > -mapper /bin/cat \
    > -reducer /bin/sort
    

    Как видно, мы добавили "reducer". Перезапустим скрипт вторично:

    ........... == Второе соборное послание святого апостола Петра == ..............
    === Глава 2, Стих 9 ===
    4 Ибо, если Бог  ангелов  согрешивших  не  пощадил,  но,  связав  узами  адского
    мрака, предал блюсти на суд для наказания;
    5 и если не пощадил первого мира, но в  восьми  душах  сохранил  семейство  Ноя,
    проповедника правды, когда навел потоп на мир нечестивых;
    6 и если города Содомские и  Гоморрские,  осудив  на  истребление,  превратил  в
    пепел, показав пример будущим нечестивцам,
    7 а   праведного   Лота,   утомленного   обращением   между   людьми    неистово
    развратными, избавил
    8 (ибо сей праведник, живя между  ними,  ежедневно  мучился  в  праведной  душе,
    видя и слыша дела беззаконные) --
    9 то, конечно, знает  Господь,  как  избавлять  благочестивых  от  искушения,  а
    беззаконников соблюдать ко дню суда, для наказания,
    10 а  наипаче  тех,  которые  идут  вслед  скверных  похотей  плоти,   презирают
    начальства, дерзки, своевольны и не страшатся злословить высших,
    11 тогда как и Ангелы, превосходя их крепостью и силою,  не  произносят  на  них
    пред Господом укоризненного суда.
    
    (b+/b-, c+/c-, +/-, *) >
    
    [oracle@hadoop]$ hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u4.jar \
    > -input /user/oracle/data \
    > -output /user/oracle/result \
    > -mapper /bin/cat \
    > -reducer /bin/sort
    packageJobJar: [/tmp/hadoop-oracle/hadoop-unjar3453045901145794770/] []
    /tmp/streamjob1539054464027589119.jar tmpDir=null
    12/07/17 20:58:18 WARN snappy.LoadSnappy: Snappy native library is available
    12/07/17 20:58:18 INFO util.NativeCodeLoader: Loaded the native-hadoop library
    12/07/17 20:58:18 INFO snappy.LoadSnappy: Snappy native library loaded
    12/07/17 20:58:18 INFO mapred.FileInputFormat: Total input paths to process : 4
    12/07/17 20:58:19 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-oracle/mapred/local]
    12/07/17 20:58:19 INFO streaming.StreamJob: Running job: job_201207171943_0005
    12/07/17 20:58:19 INFO streaming.StreamJob: To kill this job, run:
    12/07/17 20:58:19 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job
    -Dmapred.job.tracker=localhost.localdomain:8021 -kill job_201207171943_0005
    12/07/17 20:58:19 INFO streaming.StreamJob: Tracking URL:
    http://localhost.localdomain:50030/jobdetails.jsp?jobid=job_201207171943_0005
    12/07/17 20:58:20 INFO streaming.StreamJob:  map 0%  reduce 0%
    12/07/17 20:58:28 INFO streaming.StreamJob:  map 50%  reduce 0%
    12/07/17 20:58:34 INFO streaming.StreamJob:  map 100%  reduce 0%
    12/07/17 20:58:40 INFO streaming.StreamJob:  map 100%  reduce 33%
    12/07/17 20:58:42 INFO streaming.StreamJob:  map 100%  reduce 100%
    12/07/17 20:58:44 INFO streaming.StreamJob: Job complete: job_201207171943_0005
    12/07/17 20:58:44 INFO streaming.StreamJob: Output: /user/oracle/result
    [oracle@hadoop]$
    
    [oracle@hadoop]$ hadoop fs -ls /user/oracle/result
    Found 3 items
    -rw-r--r--   3 oracle hadoop          0 2012-07-17 20:58 /user/oracle/result/_SUCCESS
    drwxrwxrwx   - oracle hadoop          0 2012-07-17 20:58 /user/oracle/result/_logs
    -rw-r--r--   3 oracle hadoop        323 2012-07-17 20:58 /user/oracle/result/part-00000
    [oracle@hadoop]$ hadoop fs -cat /user/oracle/result/part-00000
    10
    cluster
    Ethernet
    Gbps
    hadoop
    hadoop
    HDFS
    Infiniband
    is
    mappers
    reducers
    this
    who
    будет
    быть
    в
    восемь
    время
    высокая
    данных
    два
    два
    для
    для
    единственный
    Если
    Жили-были
    знаешь
    знаешь
    и
    и
    и
    и
    или
    использовать
    конечный
    кота
    который
    лапок
    Маршак
    минимальное
    может
    набор
    наши
    Обычно
    отклика
    помещён
    понять
    производительность
    работает
    Рекомендуем
    содержать
    состоянии
    стишок
    тo
    текстовый
    файл
    хвоста
    [oracle@hadoop]$
    

    Обратите внимание, hadoop правильно работает с Unicode и в состоянии правильно отсортировать русские и английские слова. Используем средства линукс для ещё одной проверки:

    [oracle@hadoop]$ hadoop fs -cat /user/oracle/result/part-00000 | sort >/tmp/2
    [oracle@hadoop]$ hadoop fs -cat /user/oracle/result/part-00000 >/tmp/1
    [oracle@hadoop]$ diff /tmp/1  /tmp/2
    [oracle@hadoop]$
    

    Вывод утилиты "diff" свидетельствует о полной идентичности обоих файлов - hadoop отсортировал вывод так, как нам было необходимо.

    После удачного тестирования мы знаем, что наши скрипты работают правильно на hadoop кластере, используя "streaming". Впрочем, это не совсем так - внимательный читатель мог обратить внимание на имя библиотеки в предыдущих тестах и догадаться, что я использовал виртуальную машину, предоставляемую Cloudera для демонстрационных целей. И теперь настало время начать использовать настоящий кластер hadoop.

    Работа с настоящими данными

    Следующая задача - предварительная сортировка большого количества логов вебсервера, для их последующей загрузки средствами ETL в хранилище данных. Такой подход позволяет снизить нагрузку на базу данных и сервер ETL. Естественно, мы также могли бы включить большую часть обработки непосредственно в hadoop скрипт (например, определение длины сеанса индивидуального пользователя). Но для простоты примера я решил ограничиться сортировкой лог-записей по IP адресу и имени пользователя.

    Посмотрим на формат исходных записей и их количество:

    [oracle@hadoop]$ hadoop fs -ls /user/oracle/hadoop/apache_logs_tb | head
    Found 4005 items
    -rw-r--r--   3 oracle hadoop  436586628 2012-07-05 16:51 /user/oracle/hadoop/apache_logs_tb/apache-045101650.log
    -rw-r--r--   3 oracle hadoop  436796712 2012-07-05 16:51 /user/oracle/hadoop/apache_logs_tb/apache-045107619.log
    -rw-r--r--   3 oracle hadoop  435800852 2012-07-05 16:51 /user/oracle/hadoop/apache_logs_tb/apache-045112610.log
    -rw-r--r--   3 oracle hadoop  437105416 2012-07-05 16:51 /user/oracle/hadoop/apache_logs_tb/apache-045116792.log
    -rw-r--r--   3 oracle hadoop  436683731 2012-07-05 16:51 /user/oracle/hadoop/apache_logs_tb/apache-04512141.log
    -rw-r--r--   3 oracle hadoop  436746609 2012-07-05 16:51 /user/oracle/hadoop/apache_logs_tb/apache-04512525.log
    -rw-r--r--   3 oracle hadoop  436834383 2012-07-05 16:51 /user/oracle/hadoop/apache_logs_tb/apache-045129394.log
    -rw-r--r--   3 oracle hadoop  436602364 2012-07-05 16:51 /user/oracle/hadoop/apache_logs_tb/apache-045133688.log
    -rw-r--r--   3 oracle hadoop  436491954 2012-07-05 16:51 /user/oracle/hadoop/apache_logs_tb/apache-04513801.log
    [oracle@hadoop]$ hadoop fs -ls /user/oracle/hadoop/apache_logs_tb | wc -l
    4006
    [oracle@hadoop]$
    [oracle@hadoop]$ hadoop fs -dus /user/oracle/hadoop/apache_logs_tb
    hdfs://node01/user/oracle/hadoop/apache_logs_tb 1746689019469
    [oracle@hadoop]$
    [oracle@hadoop]$ hadoop fs -tail /user/oracle/hadoop/apache_logs_tb/apache-045101650.log | tail -1
    102.134.002.480 - 1013506 [07/Jul/2012:04:16:12 -0700] "GET /view?prod=rack_36u_30_4in_deep_rack_black HTTP/1.0"
    200 2000 "/view?prod=600gb_usb_2_0_feedback_megadisk_drive" "Mozilla/5.0 (Android 2.2; Windows; U;
    Windows NT 6.1; en-US) AppleWebKit/533.19.4 (KHTML, like Gecko) Version/5.0.3 Safari/533.19.4"
    [oracle@hadoop]$
    

    Всего в нашем распоряжении имеется около 1.7 Тб текстовых данных. Нам необходимо отсортировать 4006 файлов, каждая запись которых начинается с IP адреса и имени пользователя. Мы хотим получить список, в котором все записи для одного IP сгруппированы вместе и отсортированы по имени пользователя. И это именно то, что делает наш тестовый hadoop скрипт выше. В то же время, по умолчанию hadoop будет использовать всего один "reducer", что явно недостаточно для обработки такого объёма записей. Таким образом, нам необходимо использовать достаточное количество "reducer"ов, работающих параллельно на узлах нашего кластера.

    В конце работы задачи hadoop мы получим множество выходных файлов (равное количеству "reducer"ов). И нам понадобится сделать ещё один "проход" скрипта hadoop, для объединения всех выходных файлов в один финальный результирующий файл.

    Решение может быть более эффективным, но мы хотим продемонстрировать общий подход к обработке данных на кластере hadoop и будем следовать изложенному выше тестовому плану. Для улучшения производительности нашей задачи hadoop, нам надо написать более эффективный "map" скрипт, способный "отфильтровать" входные записи и вывести только интересующие нас поля - IP адрес и имя пользователя, таким образом существенно снижая объём данных на входе в "reducer". В нашем случае "map" представлен shell скриптом:

    [oracle@hadoop]$ cat /tmp/map.sh
    #!/bin/bash
    # Читает строку HTTP лога и выводит только IP и username, например:
    # 165.204.446.599 - 1437718
    cut -d' ' -f1-3
    [oracle@hadoop]$
    

    Для успешной работы hadoop этот скрипт должен быть скопирован на каждый узел кластера:

    [root@hadoop]# dcli -f /tmp/map.sh -d /tmp/map.sh
    [root@hadoop]# dcli "chown oracle:dba /tmp/{r,m}*.sh"
    [root@hadoop]# dcli "chmod a+x /tmp/{r,m}*.sh"
    [oracle@hadoop]$ dcli ls -la /tmp/map.sh
    192.168.10.1: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh
    192.168.10.2: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh
    192.168.10.3: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh
    192.168.10.4: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh
    192.168.10.5: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh
    192.168.10.6: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh
    192.168.10.7: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh
    192.168.10.8: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh
    192.168.10.9: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh
    192.168.10.10: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh
    192.168.10.11: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh
    192.168.10.12: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh
    192.168.10.13: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh
    192.168.10.14: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh
    192.168.10.15: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh
    192.168.10.16: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh
    192.168.10.17: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh
    192.168.10.18: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh
    [oracle@hadoop]$
    

    Первый запуск на реальных данных - обработка "сырых" записей

    Как уже было сказано, мы должны увеличить число "reducer"ов (до 100 в этом примере):

    [oracle@hadoop]$ cat /tmp/1.sh
    # Очистить директорию для вывода
    hadoop dfs -rmr /user/oracle/hadoop/weblog/input/output
    # Запустить задачу
    hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u3b.jar \
    -Dmapred.reduce.tasks=100 \
    -Dmapred.job.name="MR-Big-2Tb-Test" \
    -input /user/oracle/hadoop/apache_logs_tb \
    -output /user/oracle/hadoop/weblog/input/output \
    -mapper /tmp/map.sh \
    -reducer /bin/sort
    
    # Проверить результат
    hadoop fs -ls /user/oracle/hadoop/weblog/input/output | wc -l
    hadoop fs -tail /user/oracle/hadoop/weblog/input/output/part-00000
    

    Запустим скрипт и, пока он выполняется, проверим распределение нагрузки между узлами кластера:

    [oracle@hadoop]$ dcli "ps -ef | grep /tmp/map.sh | wc -l"
    192.168.10.1: 16
    192.168.10.2: 2
    192.168.10.3: 2
    192.168.10.4: 14
    192.168.10.5: 16
    192.168.10.6: 13
    192.168.10.7: 15
    192.168.10.8: 14
    192.168.10.9: 16
    192.168.10.10: 15
    192.168.10.11: 14
    192.168.10.12: 13
    192.168.10.13: 14
    192.168.10.14: 13
    192.168.10.15: 16
    192.168.10.16: 10
    192.168.10.17: 13
    192.168.10.18: 13
    [oracle@hadoop]$
    

    Более подробная информация о выполнении задачи может быть получена на странице Cloudera manager по ссылке "JobTracker Web UI".

    Через 15-20 минут сортировка закончится, вот сообщения во время работы скрипта:

    .............. == Послание к Галатам святого апостола Павла == .................
    === Глава 4, Стих 2 ===
    1 Еще скажу: наследник, доколе в детстве, ничем не отличается от  раба,  хотя  и
    господин всего:
    2 он подчинен попечителям  и  домоправителям  до  срока,  отцом  _назначенного._
    3 Так и мы, доколе были в детстве, были порабощены  вещественным  началам  мира;
    4 но когда пришла  полнота  времени,  Бог  послал  Сына  Своего  [Единородного],
    Который родился от жены, подчинился закону,
    5 чтобы    искупить    подзаконных,    дабы    нам    получить      усыновление.
    
    (b+/b-, c+/c-, +/-, *) >
    
    [oracle@hadoop]$ /tmp/1.sh
    Deleted hdfs://node01/user/oracle/hadoop/weblog/input/output
    packageJobJar: [/tmp/hadoop-oracle/hadoop-unjar2346688814616659663/] []
    /tmp/streamjob2110425387383162226.jar tmpDir=null
    12/07/18 20:20:58 WARN snappy.LoadSnappy: Snappy native library is available
    12/07/18 20:20:58 INFO util.NativeCodeLoader: Loaded the native-hadoop library
    12/07/18 20:20:58 INFO snappy.LoadSnappy: Snappy native library loaded
    12/07/18 20:20:58 INFO mapred.FileInputFormat: Total input paths to process : 4005
    12/07/18 20:21:00 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-oracle/mapred/local]
    12/07/18 20:21:00 INFO streaming.StreamJob: Running job: job_201206210023_0194
    12/07/18 20:21:00 INFO streaming.StreamJob: To kill this job, run:
    12/07/18 20:21:00 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job
    -Dmapred.job.tracker=node03:8021 -kill job_201206210023_0194
    12/07/18 20:21:00 INFO streaming.StreamJob:
    Tracking URL: http://0.0.0.0:50030/jobdetails.jsp?jobid=job_201206210023_0194
    12/07/18 20:21:01 INFO streaming.StreamJob:  map 0%  reduce 0%
    12/07/18 20:21:13 INFO streaming.StreamJob:  map 1%  reduce 0%
    ...
    12/07/18 20:30:32 INFO streaming.StreamJob:  map 99%  reduce 32%
    12/07/18 20:30:39 INFO streaming.StreamJob:  map 100%  reduce 33%
    12/07/18 20:30:50 INFO streaming.StreamJob:  map 100%  reduce 35%
    ...
    12/07/18 20:38:44 INFO streaming.StreamJob:  map 100%  reduce 99%
    12/07/18 20:38:59 INFO streaming.StreamJob:  map 100%  reduce 100%
    
    12/07/18 20:41:56 INFO streaming.StreamJob: Job complete: job_201206210023_0194
    12/07/18 20:41:56 INFO streaming.StreamJob: Output: /user/oracle/hadoop/weblog/input/output
    103
    165.204.446.598 - 1307612
    165.204.446.598 - 1307612
    165.204.446.598 - 1307612
    ...
    165.204.446.599 - 1328654
    [oracle@hadoop]$
    

    Счётчики выполнения задачи на странице "JobTracker Web UI" показывают, что все данные полностью были прочитаны из файловой системы HDFS - "HDFS_BYTES_READ" = 1,746,951,467,031 и пересланы на вход 8003-х "map" заданий - "Map input bytes" = 1,746,689,019,469. "Mapper"ы вырезали из каждой строки только 2 первых поля - "Map output bytes" = 161,939,806,380 и отправили их на вход 100 "reducer"ов. Ни одной записи не было потеряно при обработке - "Map input records" = "Reduce output records" = 5,997,840,132. И именно это число строк будет использовано в качестве исходных данных для следующей задачи hadoop.

    Неудобство заключается в большом количестве выходных файлов с именами "part-0*". Для объединения этих 100 промежуточных фрагментов в один результат нам необходимо выполнить ещё один шаг, но на этот раз с единственным "reducer".

    Второй запуск на реальных данных - объединение промежуточных результатов

    Hadoop скрипт для этого шага очень прост. Мы можем использовать "cat" как mapper и "sort" как reducer. В реальной ситуации "reducer" в обеих задачах должен быть более сложной программой, выполняющей необходимые подсчёты - определение сессий индивидуального пользователя, например.

    Почти во всех случаях использования MapReduce "mapper" фильтрует данные, снижая их общий объём на выходе, а "reducer" суммирует результаты, группируя строки и выводя только суммарные значения.

    Пытаясь предугадать будущее, мы ожидаем увидеть в результирующем файле 5,997,840,132 строк. Само собой, эта задача будет выполняться очень медленно - один экземпляр "reducer"а будет обрабатывать такое количество строк слишком долго.

    Мы можем улучшить ситуацию - поскольку нас интересуют только уникальные комбинации IP адресов и имён пользователя, использование на этапе "map" утилиты "uniq" позволит нам предотвратить появления одной и той же строки множество раз. Опять же, эта программа будет включена в скрипт, который должен быть скопирован на каждый узел кластера (не забывайте о правильных разрешениях для файла). Сам hadoop также может "разослать" необходимые файлы на узлы кластера перед началом выполнения задания, но я предпочитаю скопировать файлы вручную, что даёт мне возможность проверить разрешения и владельца файлов.

    Ниже приведен наш "mapper" скрипт для последнего шага - объединения фрагментов.

    [oracle@hadoop]$ cat /tmp/u.sh
    #! /bin/bash
    cat | uniq
    
    [oracle@hadoop]$
    

    А вот и сам hadoop скрипт для консолидации 100 промежуточных фрагментов:

    [oracle@hadoop]$ cat /tmp/2.sh
    #!/bin/bash
    # Посмотрите http://hadoop.apache.org/common/docs/r0.18.3/streaming.html
    # и http://hadoop.apache.org/common/docs/r0.18.3/mapred_tutorial.html#Reducer
    
    # Запустите этот скрипт для окончательной сортировки и объединения файлов.
    
    hadoop dfs -rmr  /user/oracle/hadoop/endresult
    
    hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u3b.jar \
    -Dmapred.reduce.tasks=1 \
    -Dmapred.job.name="MR-Big-2Tb-Result-Sort-Join" \
    -input /user/oracle/hadoop/weblog/input/output \
    -output /user/oracle/hadoop/endresult \
    -mapper /tmp/u.sh \
    -reducer /bin/sort
    # По умолчанию, hadoop сортирует записи автоматически.
    # Мы просто хотим другой тип сортировки.
    
    
    hadoop fs -ls /user/oracle/hadoop/endresult
    hadoop fs -tail /user/oracle/hadoop/endresult/part-00000
    # Вот и наш конечный результат, все уникальные строки в одном файле!
    

    Запустим последний скрипт.

    ........................ == Книга пророка Малахии == ...........................
    === Глава 3, Стих 1 ===
    1 Вот, Я посылаю Ангела Моего, и он  приготовит  путь  предо  Мною,  и  внезапно
    придет в храм Свой Господь, Которого вы ищете, и  Ангел  завета,  Которого  вы
    желаете; вот, Он идет, говорит Господь Саваоф.
    
    (b+/b-, c+/c-, +/-, *) >
    
    [oracle@hadoop]$ /tmp/2.sh
    Deleted hdfs://node01/user/oracle/hadoop/endresult
    packageJobJar: [/tmp/hadoop-oracle/hadoop-unjar489523724420488488/] []
    /tmp/streamjob2737398254763666099.jar tmpDir=null
    12/07/18 22:12:58 WARN snappy.LoadSnappy: Snappy native library is available
    12/07/18 22:12:58 INFO util.NativeCodeLoader: Loaded the native-hadoop library
    12/07/18 22:12:58 INFO snappy.LoadSnappy: Snappy native library loaded
    12/07/18 22:12:58 INFO mapred.FileInputFormat: Total input paths to process : 100
    12/07/18 22:12:58 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-oracle/mapred/local]
    12/07/18 22:12:58 INFO streaming.StreamJob: Running job: job_201206210023_0199
    12/07/18 22:12:58 INFO streaming.StreamJob: To kill this job, run:
    12/07/18 22:12:58 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job
    -Dmapred.job.tracker=node03:8021 -kill job_201206210023_0199
    12/07/18 22:12:58 INFO streaming.StreamJob:
    Tracking URL: http://0.0.0.0:50030/jobdetails.jsp?jobid=job_201206210023_0199
    12/07/18 22:12:59 INFO streaming.StreamJob:  map 0%  reduce 0%
    12/07/18 22:13:09 INFO streaming.StreamJob:  map 4%  reduce 0%
    ...
    12/07/18 22:15:20 INFO streaming.StreamJob:  map 99%  reduce 26%
    12/07/18 22:15:23 INFO streaming.StreamJob:  map 100%  reduce 26%
    ...
    12/07/18 22:22:49 INFO streaming.StreamJob: Job complete: job_201206210023_0199
    12/07/18 22:22:49 INFO streaming.StreamJob: Output: /user/oracle/hadoop/endresult
    Found 3 items
    -rw-r--r--   3 oracle hadoop          0 2012-07-18 22:22 /user/oracle/hadoop/endresult/_SUCCESS
    drwxrwxr-x   - oracle hadoop          0 2012-07-18 22:12 /user/oracle/hadoop/endresult/_logs
    -rw-r--r--   3 oracle hadoop 2143875432 2012-07-18 22:15 /user/oracle/hadoop/endresult/part-00000
    165.204.446.599 - 1372914
    165.204.446.599 - 1376753
    165.204.446.599 - 1379188
    165.204.446.599 - 1380558
    165.204.446.599 - 1381517
    165.204.446.599 - 1385567
    165.204.446.599 - 1386688
    165.204.446.599 - 1388204
    165.204.446.599 - 1388265
    165.204.446.599 - 1388732
    165.204.446.599 - 1389427
    165.204.446.599 - 1391459
    165.204.446.599 - 1393704
    165.204.446.599 - 1397250
    165.204.446.599 - 1404830
    165.204.446.599 - 1405265
    165.204.446.599 - 1407207
    165.204.446.599 - 1409099
    165.204.446.599 - 1409601
    165.204.446.599 - 1414313
    165.204.446.599 - 1416926
    165.204.446.599 - 1421980
    165.204.446.599 - 1424325
    165.204.446.599 - 1424411
    165.204.446.599 - 1425118
    165.204.446.599 - 1425818
    165.204.446.599 - 1426787
    165.204.446.599 - 1427786
    165.204.446.599 - 1432169
    165.204.446.599 - 1433475
    165.204.446.599 - 1434815
    165.204.446.599 - 1437532
    165.204.446.599 - 1437718
    165.204.446.599 - 1439480
    165.204.446.599 - 1443885
    165.204.446.599 - 1445047
    165.204.446.599 - 1445143
    [oracle@hadoop]$
    

    На первый взгляд, вывод отсортирован верно - мы получили единственный файл с отсортированными IP и именами.

    Финальный "консолидирующий" hadoop скрипт выполнялся в течение 10 минут, при этом единственный "reducer" использовал 90% от общего времени. Задание прочло все 100 промежуточных фрагментов - "Map input records" = 5,997,840,132 (смотрите выше). Используя "uniq" на шаге "map" нам удалось значительно уменьшить число строк на входе этапа "shuffle / reduce" - "Map output records" = 79,403,204. Такое количество записей вполне может быть обработано одним "reducer"ом - "Reduce output records" = 79,403,204.

    Итак, наш конечный объединённый файл должен содержать 79,403,204 строк - проверим это.

    [oracle@hadoop]$ hadoop fs -copyToLocal /user/oracle/hadoop/endresult/part-00000 /tmp/final_result_sorted.txt
    [oracle@hadoop]$ wc -l /tmp/final_result_sorted.txt
    79403204 /tmp/final_result_sorted.txt
    [oracle@hadoop]$
    

    Заключение

    Результирующий файл "final_result_sorted.txt" может быть выгружен на ETL сервер. Используя hadoop, нам удалось снизить объём данных для последующей обработки в базе данных с 5,997,840,132 до 79,403,204 записей, и результат уже правильно отсортирован.

    Спасибо что зашли,

    Будьте благословенны!
    Денис