"Нас Атакуют!" Изобличи козни лукавого, запрети диаволу
Примеры использования Hadoop без программирования на Java
Технология "Больших Данных" (Big Data) вошла в моду и используется для обработки больших объёмов неструктурированных данных, подготовки, предварительной проверки и сортировки записей перед загрузкой в хранилища данных и во многих совершенно новых областях знаний. В то же время, подготовка программ для распределённых вычислений по методике MapReduce требует уверенного знания Java. И хотя имеющиеся в арсенале hadoop средства позволяют решать большинство задач без необходимости программирования, достоинства таких средств и даже само существование их являются сюрпризом для многих пользователей системы.
Предлагаемая читателю заметка представляет некоторые простые случаи использования hadoop для решения прикладных задач без необходимости программирования, что делает систему привлекательной для простых пользователей - непрограммистов.
Прежде чем мы продолжим, я хотел бы привести строки из Евангелия:
............... == Соборное послание святого апостола Иуды == ..................
=== Глава 1, Стих 25 ===
24 Могущему же соблюсти вас от падения и поставить пред славою Своею
непорочными в радости,
25 Единому Премудрому Богу, Спасителю нашему чрез Иисуса Христа Господа
нашего, слава и величие, сила и власть прежде всех веков, ныне и во все
веки. Аминь.
Лично для вас благая весть - Единородный Сын Божий Иисус Христос любит вас, Он взошёл на крест за ваши грехи, был распят и на третий день воскрес, сел одесную Бога и открыл нам дорогу в Царствие Небесное.
Святая Троица - Отец, Сын и Дух Святой - оберегает нас в нашей жизни, милосердно ожидая нашего покаяния в тяжких грехах, совершённых нами в прошлом. Кто-то, незнакомый с Иисусом Христом, может считать что он не сделал ничего зазорного и не нуждается в прощении. Как жестоко ошибается такой человек, какую незавидную участь он готовит для себя, буквально сам лишая себя Божией милости!
Каждый день, явно или неявно, нас искушает диавол, стремясь повесить ярмо нам на шеи, согнув под свою власть. И только Господь может соблюсти нас от падения, защитить и оправдать нас, поставив непорочными в радости, по Его безграничной милости к творениям рук Его. Но прежде всего каждый из нас должен задуматься о своей жизни, осознать как много грешил он в прошлом. Раскайся в грехах своих, признай славу и величие Господа - и только тогда Его сила и власть спасут и оправдают тебя. Твоя же обязанность - трезвиться и не допускать новых падений.
Покайтесь, примите Иисуса как вашего Спасителя, ибо наступают последние времена и время близко - стоит Судья у ворот.
Пожалуйста, в своих каждодневных трудах, какими бы занятыми вы себе ни казались - находите время для Бога, Его заповедей и Библии.
На главной странице этого сайта вы найдете программу для чтения Библии в командной строке - буду очень рад если программа окажется полезной. Пожалуйста, читайте Библию, на экране или в печатном виде - вы будете искренне удивлены как много там сказано лично про вас и ваши обстоятельства.
Вернёмся к нашим техническим деталям.
Итак, определим действительно ли hadoop так сложен для нас, как принято считать? Сама инфраструктура, без сомнения, представляет из себя очень современную сложную и высоко технологичную систему. Это становится очевидным после беглого взгляда на предложения Big Data от "больших компаний с мировым именем" - за исключением одной, все остальные просто-напросто копируют тот самый hadoop, навешивая на него дополнительные, якобы очень полезные и революционные, возможности (и ценник с впечатляющим количеством нулей). Ни один из "корпоративных бегемотов" не решается разработать собственную реализацию, очевидно опасаясь позорного фиаско. Но что же делать пользователю системы?
Пользователями MapReduce framework могут быть разные категории ИТ профессионалов - разработчики, администраторы - или обычные бизнес-пользователи. Для каждой категории людей hadoop имеет свои средства. Разработчики, используя Java, могут воздействовать на функциональность системы в полной мере и настолько глубоко, насколько необходимо. Прочие могут использовать системы типа Pig для "прозрачного" создания и выполнения задач MapReduce. Но самым простым, гибким и очень эффективным средством работы с hadoop является так называемый "streaming".
Идея проста - если пользователь в состоянии написать скрипт или программу, принимающую данные со стандартного ввода, обрабатывающую их надлежащим образом и выдающую результат на стандартный вывод - такой скрипт может быть "подключён" в самое сердце hadoop и будет выполнять те самые высокоэффективные параллельные вычисления на каждом из узлов кластера. Да, именно так - просто и эффективно. Никакого знания Java или MapReduce API нам не потребуется.
Тестирование будет производиться на уже имеющемся кластере. Основная идея - предварительная выборка и сортировка данных для последующей загрузки в базу данных. Для начала мы проведём простенький тест, для оценки работоспособности нашего кластера и написанных скриптов. Затем те же самые скрипты будут использованы для сортировки около 2Тб логов вебсервера. Такой подход позволит нам сэкономить ресурсы нашего сервера баз данных и ускорить последующую обработку.
Необходимые средства
Для выполнения поставленной задачи быстро и легко нам понадобится серьёзное оборудование, уже установленное и налаженное в нашей серверной комнате. Если у вас нет доступа к hadoop кластеру - всё равно существуют варианты. Я бы порекомендовал провести тестирование работоспособности скриптов на виртуальной машине, предлагаемой компанией Cloudera. При наличии средств время на hadoop кластере может быть куплено у одного из множества провайдеров в Интернете. Либо, если имеется необходимое количество компьютеров, вы можете построить свой кластер используя бесплатный продукт той же самой Cloudera. Я настоятельно не рекомендую пробовать установить кластер из оригинальной версии с сайта Apache.
Опыт программирования на Java нам не понадобится.
План действий
Мы должны выполнить определённую последовательность действий:
Отсортированный конечный файл может быть помещён на файловую систему, с которой наша база данных может загружать данные. В случае использования Оракл наиболее эффективное средство - внешние таблицы.
Начальная проверка функциональности
Как было сказано, прежде чем мы начнём сортировать терабайты данных, что может занять часы, нам необходимо провести простой функциональный тест. Я подготовил 4 маленьких тестовых файла, содержащих неотсортированные записи на английском и русском языках - мы последовательно выполним вышеописанные действия над этими файлами. Основная задача - убедиться что наши "mapper" и "reducer" понятны инфраструктуре hadoop.
Исходные данные
Эти файлы были созданы и хранятся на локальной файловой системе одного из узлов моего hadoop кластера. Все действия будут выполняться непосредственно на узлах кластера.
file1.txt ---------- высокая производительность и минимальное время отклика для hadoop Рекомендуем использовать 10 Gbps Ethernet или Infiniband для HDFS file2.txt ---------- Обычно конечный набор данных будет содержать единственный текстовый файл который может быть помещён file3.txt --------- hadoop cluster работает и в состоянии понять наши mappers и reducers file4.txt ---------- Жили-были два кота восемь лапок два хвоста Если знаешь this стишок тo и знаешь who Маршак is
Проверим наличие этих файлов на локальной файловой системе:
.............. == Соборное послание святого апостола Иакова == ................. === Глава 3, Стих 4 === 4 Вот, и корабли, как ни велики они и как ни сильными ветрами носятся, небольшим рулем направляются, куда хочет кормчий; 5 так и язык -- небольшой член, но много делает. Посмотри, небольшой огонь как много вещества зажигает! 6 И язык -- огонь, прикраса неправды; язык в таком положении находится между членами нашими, что оскверняет все тело и воспаляет круг жизни, будучи сам воспаляем от геенны. (b+/b-, c+/c-, +/-, *) > [oracle@hadoop]$ pwd /home/oracle/htest [oracle@hadoop]$ ls -latrh total 28K -rw-rw-r-- 1 oracle oracle 73 Jul 17 20:22 file1.txt -rw-rw-r-- 1 oracle oracle 79 Jul 17 20:22 file2.txt -rw-rw-r-- 1 oracle oracle 70 Jul 17 20:22 file3.txt -rw-rw-r-- 1 oracle oracle 56 Jul 17 20:22 file4.txt drwx------ 43 oracle oracle 4.0K Jul 17 20:22 .. drwxrwxr-x 2 oracle oracle 4.0K Jul 17 20:22 .
Перемещение готовых данных на HDFS
Копировать данные на HDFS просто - мы используем сам hadoop в качестве утилиты командной строки. В конце концов, ведь именно слоны перемещают тяжести, правда?
............. == Послание к Римлянам святого апостола Павла == ................. === Глава 10, Стих 10 === 9 Ибо если устами твоими будешь исповедывать Иисуса Господом и сердцем твоим веровать, что Бог воскресил Его из мертвых, то спасешься, 10 потому что сердцем веруют к праведности, а устами исповедуют ко спасению. (b+/b-, c+/c-, +/-, *) > [oracle@hadoop]$ hadoop fs -ls /user/oracle Found 3 items drwx------ - oracle hadoop 0 2012-07-05 05:42 /user/oracle/.staging drwxr-xr-x - oracle hadoop 0 2012-06-17 18:06 /user/oracle/logs drwxrwxrwx - oracle hadoop 0 2012-06-25 12:45 /user/oracle/olhcache [oracle@hadoop]$ hadoop fs -mkdir /user/oracle/data [oracle@hadoop]$ hadoop fs -ls /user/oracle Found 4 items drwx------ - oracle hadoop 0 2012-07-05 05:42 /user/oracle/.staging drwxr-xr-x - oracle hadoop 0 2012-07-17 20:28 /user/oracle/data drwxr-xr-x - oracle hadoop 0 2012-06-17 18:06 /user/oracle/logs drwxrwxrwx - oracle hadoop 0 2012-06-25 12:45 /user/oracle/olhcache [oracle@hadoop]$ [oracle@hadoop]$ hadoop fs -copyFromLocal *.txt /user/oracle/data/ [oracle@hadoop]$ hadoop fs -ls /user/oracle/data Found 4 items -rw-r--r-- 3 oracle hadoop 73 2012-07-17 20:29 /user/oracle/data/file1.txt -rw-r--r-- 3 oracle hadoop 79 2012-07-17 20:29 /user/oracle/data/file2.txt -rw-r--r-- 3 oracle hadoop 70 2012-07-17 20:29 /user/oracle/data/file3.txt -rw-r--r-- 3 oracle hadoop 56 2012-07-17 20:29 /user/oracle/data/file4.txt [oracle@hadoop]$ hadoop fs -tail /user/oracle/data/file4.txt Жили-были два кота восемь лапок два хвоста Если знаешь this стишок тo и знаешь who Маршак is [oracle@hadoop]$
Итак, все наши файлы уже видны всем компонентам инфраструктуры hadoop и могут быть использованы всеми узлами кластера. Слонёнок справился со своим первым заданием на отлично!
Написание "mapper" и "reducer" скриптов
Что радует - эти скрипты будут очень простые и универсальные, мы сможем использовать их как шаблон для последующих задач. Код скрипта выглядит так:
hadoop fs -rmr /user/oracle/result hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u4.jar \ > -input /user/oracle/data \ > -output /user/oracle/result \ > -mapper /bin/cat
Мы используем одну из возможностей hadoop - упомянутый в начале заметки "streaming", но с Java библиотекой, переработанной для нас компанией Cloudera. Вы можете также попробовать использовать JAR файл, предоставляемый самим проектом Apache Hadoop.
Но посмотрите на "mapper" - это не совпадение, мы используем стандартную команду линукса "cat". Эта программа просто копирует ввод на вывод. Зачем? Давайте запустим наш hadoop скрипт и посмотрим. Кстати, в нашем первом скрипте мы не используем "reducer".
Выполнение основной задачи hadoop
И снова мы поручим всю тяжелую работу нашему слонёнку и просто запустим скрипт:
................... == От Луки святое благовествование == ...................... === Глава 17, Стих 21 === 20 Быв же спрошен фарисеями, когда придет Царствие Божие, отвечал им: не придет Царствие Божие приметным образом, 21 и не скажут: вот, оно здесь, или: вот, там. Ибо вот, Царствие Божие внутрь вас есть. (b+/b-, c+/c-, +/-, *) > [oracle@hadoop]$ hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u4.jar \ -input /user/oracle/data -output /user/oracle/result -mapper /bin/cat packageJobJar: [/tmp/hadoop-oracle/hadoop-unjar7954453134228920110/] [] /tmp/streamjob2262620489771001950.jar tmpDir=null 12/07/17 20:41:47 WARN snappy.LoadSnappy: Snappy native library is available 12/07/17 20:41:47 INFO util.NativeCodeLoader: Loaded the native-hadoop library 12/07/17 20:41:47 INFO snappy.LoadSnappy: Snappy native library loaded 12/07/17 20:41:47 INFO mapred.FileInputFormat: Total input paths to process : 4 12/07/17 20:41:47 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-oracle/mapred/local] 12/07/17 20:41:47 INFO streaming.StreamJob: Running job: job_201207171943_0004 12/07/17 20:41:47 INFO streaming.StreamJob: To kill this job, run: 12/07/17 20:41:47 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job -Dmapred.job.tracker=localhost.localdomain:8021 -kill job_201207171943_0004 12/07/17 20:41:47 INFO streaming.StreamJob: Tracking URL: http://localhost.localdomain:50030/jobdetails.jsp?jobid=job_201207171943_0004 12/07/17 20:41:48 INFO streaming.StreamJob: map 0% reduce 0% 12/07/17 20:41:57 INFO streaming.StreamJob: map 50% reduce 0% 12/07/17 20:42:03 INFO streaming.StreamJob: map 100% reduce 0% 12/07/17 20:42:09 INFO streaming.StreamJob: map 100% reduce 33% 12/07/17 20:42:11 INFO streaming.StreamJob: map 100% reduce 100% 12/07/17 20:42:14 INFO streaming.StreamJob: Job complete: job_201207171943_0004 12/07/17 20:42:14 INFO streaming.StreamJob: Output: /user/oracle/result
Заметьте - hadoop нашёл и прочитал все входные файлы, о чём свидетельствует запись "Total input paths to process : 4". Сообщение "Job complete" приглашает нас взглянуть на результат работы hadoop скрипта:
..................... == Первая книга Паралипоменон == ......................... === Глава 10, Стих 14 === 13 Так умер Саул за свое беззаконие, которое он сделал пред Господом, за то, что не соблюл слова Господня и обратился к волшебнице с вопросом, 14 а не взыскал Господа. _За_ _то_ Он и умертвил его, и передал царство Давиду, сыну Иессееву. (b+/b-, c+/c-, +/-, *) > oracle@hadoop]$ hadoop fs -ls /user/oracle/result Found 3 items -rw-r--r-- 3 oracle hadoop 0 2012-07-17 20:42 /user/oracle/result/_SUCCESS drwxrwxrwx - oracle hadoop 0 2012-07-17 20:41 /user/oracle/result/_logs -rw-r--r-- 3 oracle hadoop 323 2012-07-17 20:42 /user/oracle/result/part-00000
Нас интересуют файлы с именем "part-*", из их имён понятно на сколько фрагментов был разбит выходной файл. Что же мы ожидаем увидеть на выходе? Спецификация hadoop требует сортировки данных после этапа "map", таким образом, даже при отсутствии "reducer", мы надеемся найти в выходном файле правильно отсортированные строки из всех четырёх входных файлов.
[oracle@hadoop]$ hadoop fs -cat /user/oracle/result/part-00000 10 Ethernet Gbps HDFS Infiniband cluster hadoop hadoop is mappers reducers this who Если Жили-были Маршак Обычно Рекомендуем будет быть в восемь время высокая данных два два для для единственный знаешь знаешь и и и и или использовать конечный кота который лапок минимальное может набор наши отклика помещён понять производительность работает содержать состоянии стишок тo текстовый файл хвоста [oracle@hadoop]$
Порядок слов выглядит правильным, но всегда лучше проверить дважды. Я перенаправлю вывод на вход стандартной утилиты линукс "sort":
[oracle@hadoop]$ hadoop fs -cat /user/oracle/result/part-00000 | sort 10 cluster Ethernet Gbps hadoop hadoop HDFS Infiniband is mappers reducers this who будет быть в восемь время высокая данных два два для для единственный Если Жили-были знаешь знаешь и и и и или использовать конечный кота который лапок Маршак минимальное может набор наши Обычно отклика помещён понять производительность работает Рекомендуем содержать состоянии стишок тo текстовый файл хвоста [oracle@hadoop]$
Этот второй вариант сортировки мне нравится больше первого! Могу ли я заставить hadoop воспроизвести такой же результат? Да, и именно для этого нам понадобится "reducer" - утилита "sort". В этом случае вывод всех "mapper"ов (cat) будет перенаправлен на вход "reducer"а (sort), производя желаемый результат.
Повторный запуск скрипта с "reducer"
Изменим наш скрипт-шаблон и запустим его вновь. Но прежде всего не забудьте скопировать с HDFS на локальный диск ваши прошлые результаты - скрипт полностью удалит и создаст заново директорию "result".
hadoop fs -rmr /user/oracle/result hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u4.jar \ > -input /user/oracle/data \ > -output /user/oracle/result \ > -mapper /bin/cat \ > -reducer /bin/sort
Как видно, мы добавили "reducer". Перезапустим скрипт вторично:
........... == Второе соборное послание святого апостола Петра == .............. === Глава 2, Стих 9 === 4 Ибо, если Бог ангелов согрешивших не пощадил, но, связав узами адского мрака, предал блюсти на суд для наказания; 5 и если не пощадил первого мира, но в восьми душах сохранил семейство Ноя, проповедника правды, когда навел потоп на мир нечестивых; 6 и если города Содомские и Гоморрские, осудив на истребление, превратил в пепел, показав пример будущим нечестивцам, 7 а праведного Лота, утомленного обращением между людьми неистово развратными, избавил 8 (ибо сей праведник, живя между ними, ежедневно мучился в праведной душе, видя и слыша дела беззаконные) -- 9 то, конечно, знает Господь, как избавлять благочестивых от искушения, а беззаконников соблюдать ко дню суда, для наказания, 10 а наипаче тех, которые идут вслед скверных похотей плоти, презирают начальства, дерзки, своевольны и не страшатся злословить высших, 11 тогда как и Ангелы, превосходя их крепостью и силою, не произносят на них пред Господом укоризненного суда. (b+/b-, c+/c-, +/-, *) > [oracle@hadoop]$ hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u4.jar \ > -input /user/oracle/data \ > -output /user/oracle/result \ > -mapper /bin/cat \ > -reducer /bin/sort packageJobJar: [/tmp/hadoop-oracle/hadoop-unjar3453045901145794770/] [] /tmp/streamjob1539054464027589119.jar tmpDir=null 12/07/17 20:58:18 WARN snappy.LoadSnappy: Snappy native library is available 12/07/17 20:58:18 INFO util.NativeCodeLoader: Loaded the native-hadoop library 12/07/17 20:58:18 INFO snappy.LoadSnappy: Snappy native library loaded 12/07/17 20:58:18 INFO mapred.FileInputFormat: Total input paths to process : 4 12/07/17 20:58:19 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-oracle/mapred/local] 12/07/17 20:58:19 INFO streaming.StreamJob: Running job: job_201207171943_0005 12/07/17 20:58:19 INFO streaming.StreamJob: To kill this job, run: 12/07/17 20:58:19 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job -Dmapred.job.tracker=localhost.localdomain:8021 -kill job_201207171943_0005 12/07/17 20:58:19 INFO streaming.StreamJob: Tracking URL: http://localhost.localdomain:50030/jobdetails.jsp?jobid=job_201207171943_0005 12/07/17 20:58:20 INFO streaming.StreamJob: map 0% reduce 0% 12/07/17 20:58:28 INFO streaming.StreamJob: map 50% reduce 0% 12/07/17 20:58:34 INFO streaming.StreamJob: map 100% reduce 0% 12/07/17 20:58:40 INFO streaming.StreamJob: map 100% reduce 33% 12/07/17 20:58:42 INFO streaming.StreamJob: map 100% reduce 100% 12/07/17 20:58:44 INFO streaming.StreamJob: Job complete: job_201207171943_0005 12/07/17 20:58:44 INFO streaming.StreamJob: Output: /user/oracle/result [oracle@hadoop]$ [oracle@hadoop]$ hadoop fs -ls /user/oracle/result Found 3 items -rw-r--r-- 3 oracle hadoop 0 2012-07-17 20:58 /user/oracle/result/_SUCCESS drwxrwxrwx - oracle hadoop 0 2012-07-17 20:58 /user/oracle/result/_logs -rw-r--r-- 3 oracle hadoop 323 2012-07-17 20:58 /user/oracle/result/part-00000 [oracle@hadoop]$ hadoop fs -cat /user/oracle/result/part-00000 10 cluster Ethernet Gbps hadoop hadoop HDFS Infiniband is mappers reducers this who будет быть в восемь время высокая данных два два для для единственный Если Жили-были знаешь знаешь и и и и или использовать конечный кота который лапок Маршак минимальное может набор наши Обычно отклика помещён понять производительность работает Рекомендуем содержать состоянии стишок тo текстовый файл хвоста [oracle@hadoop]$
Обратите внимание, hadoop правильно работает с Unicode и в состоянии правильно отсортировать русские и английские слова. Используем средства линукс для ещё одной проверки:
[oracle@hadoop]$ hadoop fs -cat /user/oracle/result/part-00000 | sort >/tmp/2 [oracle@hadoop]$ hadoop fs -cat /user/oracle/result/part-00000 >/tmp/1 [oracle@hadoop]$ diff /tmp/1 /tmp/2 [oracle@hadoop]$
Вывод утилиты "diff" свидетельствует о полной идентичности обоих файлов - hadoop отсортировал вывод так, как нам было необходимо.
После удачного тестирования мы знаем, что наши скрипты работают правильно на hadoop кластере, используя "streaming". Впрочем, это не совсем так - внимательный читатель мог обратить внимание на имя библиотеки в предыдущих тестах и догадаться, что я использовал виртуальную машину, предоставляемую Cloudera для демонстрационных целей. И теперь настало время начать использовать настоящий кластер hadoop.
Работа с настоящими данными
Следующая задача - предварительная сортировка большого количества логов вебсервера, для их последующей загрузки средствами ETL в хранилище данных. Такой подход позволяет снизить нагрузку на базу данных и сервер ETL. Естественно, мы также могли бы включить большую часть обработки непосредственно в hadoop скрипт (например, определение длины сеанса индивидуального пользователя). Но для простоты примера я решил ограничиться сортировкой лог-записей по IP адресу и имени пользователя.
Посмотрим на формат исходных записей и их количество:
[oracle@hadoop]$ hadoop fs -ls /user/oracle/hadoop/apache_logs_tb | head Found 4005 items -rw-r--r-- 3 oracle hadoop 436586628 2012-07-05 16:51 /user/oracle/hadoop/apache_logs_tb/apache-045101650.log -rw-r--r-- 3 oracle hadoop 436796712 2012-07-05 16:51 /user/oracle/hadoop/apache_logs_tb/apache-045107619.log -rw-r--r-- 3 oracle hadoop 435800852 2012-07-05 16:51 /user/oracle/hadoop/apache_logs_tb/apache-045112610.log -rw-r--r-- 3 oracle hadoop 437105416 2012-07-05 16:51 /user/oracle/hadoop/apache_logs_tb/apache-045116792.log -rw-r--r-- 3 oracle hadoop 436683731 2012-07-05 16:51 /user/oracle/hadoop/apache_logs_tb/apache-04512141.log -rw-r--r-- 3 oracle hadoop 436746609 2012-07-05 16:51 /user/oracle/hadoop/apache_logs_tb/apache-04512525.log -rw-r--r-- 3 oracle hadoop 436834383 2012-07-05 16:51 /user/oracle/hadoop/apache_logs_tb/apache-045129394.log -rw-r--r-- 3 oracle hadoop 436602364 2012-07-05 16:51 /user/oracle/hadoop/apache_logs_tb/apache-045133688.log -rw-r--r-- 3 oracle hadoop 436491954 2012-07-05 16:51 /user/oracle/hadoop/apache_logs_tb/apache-04513801.log [oracle@hadoop]$ hadoop fs -ls /user/oracle/hadoop/apache_logs_tb | wc -l 4006 [oracle@hadoop]$ [oracle@hadoop]$ hadoop fs -dus /user/oracle/hadoop/apache_logs_tb hdfs://node01/user/oracle/hadoop/apache_logs_tb 1746689019469 [oracle@hadoop]$ [oracle@hadoop]$ hadoop fs -tail /user/oracle/hadoop/apache_logs_tb/apache-045101650.log | tail -1 102.134.002.480 - 1013506 [07/Jul/2012:04:16:12 -0700] "GET /view?prod=rack_36u_30_4in_deep_rack_black HTTP/1.0" 200 2000 "/view?prod=600gb_usb_2_0_feedback_megadisk_drive" "Mozilla/5.0 (Android 2.2; Windows; U; Windows NT 6.1; en-US) AppleWebKit/533.19.4 (KHTML, like Gecko) Version/5.0.3 Safari/533.19.4" [oracle@hadoop]$
Всего в нашем распоряжении имеется около 1.7 Тб текстовых данных. Нам необходимо отсортировать 4006 файлов, каждая запись которых начинается с IP адреса и имени пользователя. Мы хотим получить список, в котором все записи для одного IP сгруппированы вместе и отсортированы по имени пользователя. И это именно то, что делает наш тестовый hadoop скрипт выше. В то же время, по умолчанию hadoop будет использовать всего один "reducer", что явно недостаточно для обработки такого объёма записей. Таким образом, нам необходимо использовать достаточное количество "reducer"ов, работающих параллельно на узлах нашего кластера.
В конце работы задачи hadoop мы получим множество выходных файлов (равное количеству "reducer"ов). И нам понадобится сделать ещё один "проход" скрипта hadoop, для объединения всех выходных файлов в один финальный результирующий файл.
Решение может быть более эффективным, но мы хотим продемонстрировать общий подход к обработке данных на кластере hadoop и будем следовать изложенному выше тестовому плану. Для улучшения производительности нашей задачи hadoop, нам надо написать более эффективный "map" скрипт, способный "отфильтровать" входные записи и вывести только интересующие нас поля - IP адрес и имя пользователя, таким образом существенно снижая объём данных на входе в "reducer". В нашем случае "map" представлен shell скриптом:
[oracle@hadoop]$ cat /tmp/map.sh #!/bin/bash # Читает строку HTTP лога и выводит только IP и username, например: # 165.204.446.599 - 1437718 cut -d' ' -f1-3 [oracle@hadoop]$
Для успешной работы hadoop этот скрипт должен быть скопирован на каждый узел кластера:
[root@hadoop]# dcli -f /tmp/map.sh -d /tmp/map.sh [root@hadoop]# dcli "chown oracle:dba /tmp/{r,m}*.sh" [root@hadoop]# dcli "chmod a+x /tmp/{r,m}*.sh" [oracle@hadoop]$ dcli ls -la /tmp/map.sh 192.168.10.1: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh 192.168.10.2: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh 192.168.10.3: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh 192.168.10.4: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh 192.168.10.5: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh 192.168.10.6: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh 192.168.10.7: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh 192.168.10.8: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh 192.168.10.9: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh 192.168.10.10: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh 192.168.10.11: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh 192.168.10.12: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh 192.168.10.13: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh 192.168.10.14: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh 192.168.10.15: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh 192.168.10.16: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh 192.168.10.17: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh 192.168.10.18: -rwxr-xr-x 1 oracle dba 111 Jul 18 20:10 /tmp/map.sh [oracle@hadoop]$
Первый запуск на реальных данных - обработка "сырых" записей
Как уже было сказано, мы должны увеличить число "reducer"ов (до 100 в этом примере):
[oracle@hadoop]$ cat /tmp/1.sh # Очистить директорию для вывода hadoop dfs -rmr /user/oracle/hadoop/weblog/input/output # Запустить задачу hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u3b.jar \ -Dmapred.reduce.tasks=100 \ -Dmapred.job.name="MR-Big-2Tb-Test" \ -input /user/oracle/hadoop/apache_logs_tb \ -output /user/oracle/hadoop/weblog/input/output \ -mapper /tmp/map.sh \ -reducer /bin/sort # Проверить результат hadoop fs -ls /user/oracle/hadoop/weblog/input/output | wc -l hadoop fs -tail /user/oracle/hadoop/weblog/input/output/part-00000
Запустим скрипт и, пока он выполняется, проверим распределение нагрузки между узлами кластера:
[oracle@hadoop]$ dcli "ps -ef | grep /tmp/map.sh | wc -l" 192.168.10.1: 16 192.168.10.2: 2 192.168.10.3: 2 192.168.10.4: 14 192.168.10.5: 16 192.168.10.6: 13 192.168.10.7: 15 192.168.10.8: 14 192.168.10.9: 16 192.168.10.10: 15 192.168.10.11: 14 192.168.10.12: 13 192.168.10.13: 14 192.168.10.14: 13 192.168.10.15: 16 192.168.10.16: 10 192.168.10.17: 13 192.168.10.18: 13 [oracle@hadoop]$
Более подробная информация о выполнении задачи может быть получена на странице Cloudera manager по ссылке "JobTracker Web UI".
Через 15-20 минут сортировка закончится, вот сообщения во время работы скрипта:
.............. == Послание к Галатам святого апостола Павла == ................. === Глава 4, Стих 2 === 1 Еще скажу: наследник, доколе в детстве, ничем не отличается от раба, хотя и господин всего: 2 он подчинен попечителям и домоправителям до срока, отцом _назначенного._ 3 Так и мы, доколе были в детстве, были порабощены вещественным началам мира; 4 но когда пришла полнота времени, Бог послал Сына Своего [Единородного], Который родился от жены, подчинился закону, 5 чтобы искупить подзаконных, дабы нам получить усыновление. (b+/b-, c+/c-, +/-, *) > [oracle@hadoop]$ /tmp/1.sh Deleted hdfs://node01/user/oracle/hadoop/weblog/input/output packageJobJar: [/tmp/hadoop-oracle/hadoop-unjar2346688814616659663/] [] /tmp/streamjob2110425387383162226.jar tmpDir=null 12/07/18 20:20:58 WARN snappy.LoadSnappy: Snappy native library is available 12/07/18 20:20:58 INFO util.NativeCodeLoader: Loaded the native-hadoop library 12/07/18 20:20:58 INFO snappy.LoadSnappy: Snappy native library loaded 12/07/18 20:20:58 INFO mapred.FileInputFormat: Total input paths to process : 4005 12/07/18 20:21:00 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-oracle/mapred/local] 12/07/18 20:21:00 INFO streaming.StreamJob: Running job: job_201206210023_0194 12/07/18 20:21:00 INFO streaming.StreamJob: To kill this job, run: 12/07/18 20:21:00 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job -Dmapred.job.tracker=node03:8021 -kill job_201206210023_0194 12/07/18 20:21:00 INFO streaming.StreamJob: Tracking URL: http://0.0.0.0:50030/jobdetails.jsp?jobid=job_201206210023_0194 12/07/18 20:21:01 INFO streaming.StreamJob: map 0% reduce 0% 12/07/18 20:21:13 INFO streaming.StreamJob: map 1% reduce 0% ... 12/07/18 20:30:32 INFO streaming.StreamJob: map 99% reduce 32% 12/07/18 20:30:39 INFO streaming.StreamJob: map 100% reduce 33% 12/07/18 20:30:50 INFO streaming.StreamJob: map 100% reduce 35% ... 12/07/18 20:38:44 INFO streaming.StreamJob: map 100% reduce 99% 12/07/18 20:38:59 INFO streaming.StreamJob: map 100% reduce 100% 12/07/18 20:41:56 INFO streaming.StreamJob: Job complete: job_201206210023_0194 12/07/18 20:41:56 INFO streaming.StreamJob: Output: /user/oracle/hadoop/weblog/input/output 103 165.204.446.598 - 1307612 165.204.446.598 - 1307612 165.204.446.598 - 1307612 ... 165.204.446.599 - 1328654 [oracle@hadoop]$
Счётчики выполнения задачи на странице "JobTracker Web UI" показывают, что все данные полностью были прочитаны из файловой системы HDFS - "HDFS_BYTES_READ" = 1,746,951,467,031 и пересланы на вход 8003-х "map" заданий - "Map input bytes" = 1,746,689,019,469. "Mapper"ы вырезали из каждой строки только 2 первых поля - "Map output bytes" = 161,939,806,380 и отправили их на вход 100 "reducer"ов. Ни одной записи не было потеряно при обработке - "Map input records" = "Reduce output records" = 5,997,840,132. И именно это число строк будет использовано в качестве исходных данных для следующей задачи hadoop.
Неудобство заключается в большом количестве выходных файлов с именами "part-0*". Для объединения этих 100 промежуточных фрагментов в один результат нам необходимо выполнить ещё один шаг, но на этот раз с единственным "reducer".
Второй запуск на реальных данных - объединение промежуточных результатов
Hadoop скрипт для этого шага очень прост. Мы можем использовать "cat" как mapper и "sort" как reducer. В реальной ситуации "reducer" в обеих задачах должен быть более сложной программой, выполняющей необходимые подсчёты - определение сессий индивидуального пользователя, например.
Почти во всех случаях использования MapReduce "mapper" фильтрует данные, снижая их общий объём на выходе, а "reducer" суммирует результаты, группируя строки и выводя только суммарные значения.
Пытаясь предугадать будущее, мы ожидаем увидеть в результирующем файле 5,997,840,132 строк. Само собой, эта задача будет выполняться очень медленно - один экземпляр "reducer"а будет обрабатывать такое количество строк слишком долго.
Мы можем улучшить ситуацию - поскольку нас интересуют только уникальные комбинации IP адресов и имён пользователя, использование на этапе "map" утилиты "uniq" позволит нам предотвратить появления одной и той же строки множество раз. Опять же, эта программа будет включена в скрипт, который должен быть скопирован на каждый узел кластера (не забывайте о правильных разрешениях для файла). Сам hadoop также может "разослать" необходимые файлы на узлы кластера перед началом выполнения задания, но я предпочитаю скопировать файлы вручную, что даёт мне возможность проверить разрешения и владельца файлов.
Ниже приведен наш "mapper" скрипт для последнего шага - объединения фрагментов.
[oracle@hadoop]$ cat /tmp/u.sh #! /bin/bash cat | uniq [oracle@hadoop]$
А вот и сам hadoop скрипт для консолидации 100 промежуточных фрагментов:
[oracle@hadoop]$ cat /tmp/2.sh #!/bin/bash # Посмотрите http://hadoop.apache.org/common/docs/r0.18.3/streaming.html # и http://hadoop.apache.org/common/docs/r0.18.3/mapred_tutorial.html#Reducer # Запустите этот скрипт для окончательной сортировки и объединения файлов. hadoop dfs -rmr /user/oracle/hadoop/endresult hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u3b.jar \ -Dmapred.reduce.tasks=1 \ -Dmapred.job.name="MR-Big-2Tb-Result-Sort-Join" \ -input /user/oracle/hadoop/weblog/input/output \ -output /user/oracle/hadoop/endresult \ -mapper /tmp/u.sh \ -reducer /bin/sort # По умолчанию, hadoop сортирует записи автоматически. # Мы просто хотим другой тип сортировки. hadoop fs -ls /user/oracle/hadoop/endresult hadoop fs -tail /user/oracle/hadoop/endresult/part-00000 # Вот и наш конечный результат, все уникальные строки в одном файле!
Запустим последний скрипт.
........................ == Книга пророка Малахии == ........................... === Глава 3, Стих 1 === 1 Вот, Я посылаю Ангела Моего, и он приготовит путь предо Мною, и внезапно придет в храм Свой Господь, Которого вы ищете, и Ангел завета, Которого вы желаете; вот, Он идет, говорит Господь Саваоф. (b+/b-, c+/c-, +/-, *) > [oracle@hadoop]$ /tmp/2.sh Deleted hdfs://node01/user/oracle/hadoop/endresult packageJobJar: [/tmp/hadoop-oracle/hadoop-unjar489523724420488488/] [] /tmp/streamjob2737398254763666099.jar tmpDir=null 12/07/18 22:12:58 WARN snappy.LoadSnappy: Snappy native library is available 12/07/18 22:12:58 INFO util.NativeCodeLoader: Loaded the native-hadoop library 12/07/18 22:12:58 INFO snappy.LoadSnappy: Snappy native library loaded 12/07/18 22:12:58 INFO mapred.FileInputFormat: Total input paths to process : 100 12/07/18 22:12:58 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-oracle/mapred/local] 12/07/18 22:12:58 INFO streaming.StreamJob: Running job: job_201206210023_0199 12/07/18 22:12:58 INFO streaming.StreamJob: To kill this job, run: 12/07/18 22:12:58 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job -Dmapred.job.tracker=node03:8021 -kill job_201206210023_0199 12/07/18 22:12:58 INFO streaming.StreamJob: Tracking URL: http://0.0.0.0:50030/jobdetails.jsp?jobid=job_201206210023_0199 12/07/18 22:12:59 INFO streaming.StreamJob: map 0% reduce 0% 12/07/18 22:13:09 INFO streaming.StreamJob: map 4% reduce 0% ... 12/07/18 22:15:20 INFO streaming.StreamJob: map 99% reduce 26% 12/07/18 22:15:23 INFO streaming.StreamJob: map 100% reduce 26% ... 12/07/18 22:22:49 INFO streaming.StreamJob: Job complete: job_201206210023_0199 12/07/18 22:22:49 INFO streaming.StreamJob: Output: /user/oracle/hadoop/endresult Found 3 items -rw-r--r-- 3 oracle hadoop 0 2012-07-18 22:22 /user/oracle/hadoop/endresult/_SUCCESS drwxrwxr-x - oracle hadoop 0 2012-07-18 22:12 /user/oracle/hadoop/endresult/_logs -rw-r--r-- 3 oracle hadoop 2143875432 2012-07-18 22:15 /user/oracle/hadoop/endresult/part-00000 165.204.446.599 - 1372914 165.204.446.599 - 1376753 165.204.446.599 - 1379188 165.204.446.599 - 1380558 165.204.446.599 - 1381517 165.204.446.599 - 1385567 165.204.446.599 - 1386688 165.204.446.599 - 1388204 165.204.446.599 - 1388265 165.204.446.599 - 1388732 165.204.446.599 - 1389427 165.204.446.599 - 1391459 165.204.446.599 - 1393704 165.204.446.599 - 1397250 165.204.446.599 - 1404830 165.204.446.599 - 1405265 165.204.446.599 - 1407207 165.204.446.599 - 1409099 165.204.446.599 - 1409601 165.204.446.599 - 1414313 165.204.446.599 - 1416926 165.204.446.599 - 1421980 165.204.446.599 - 1424325 165.204.446.599 - 1424411 165.204.446.599 - 1425118 165.204.446.599 - 1425818 165.204.446.599 - 1426787 165.204.446.599 - 1427786 165.204.446.599 - 1432169 165.204.446.599 - 1433475 165.204.446.599 - 1434815 165.204.446.599 - 1437532 165.204.446.599 - 1437718 165.204.446.599 - 1439480 165.204.446.599 - 1443885 165.204.446.599 - 1445047 165.204.446.599 - 1445143 [oracle@hadoop]$
На первый взгляд, вывод отсортирован верно - мы получили единственный файл с отсортированными IP и именами.
Финальный "консолидирующий" hadoop скрипт выполнялся в течение 10 минут, при этом единственный "reducer" использовал 90% от общего времени. Задание прочло все 100 промежуточных фрагментов - "Map input records" = 5,997,840,132 (смотрите выше). Используя "uniq" на шаге "map" нам удалось значительно уменьшить число строк на входе этапа "shuffle / reduce" - "Map output records" = 79,403,204. Такое количество записей вполне может быть обработано одним "reducer"ом - "Reduce output records" = 79,403,204.
Итак, наш конечный объединённый файл должен содержать 79,403,204 строк - проверим это.
[oracle@hadoop]$ hadoop fs -copyToLocal /user/oracle/hadoop/endresult/part-00000 /tmp/final_result_sorted.txt [oracle@hadoop]$ wc -l /tmp/final_result_sorted.txt 79403204 /tmp/final_result_sorted.txt [oracle@hadoop]$
Заключение
Результирующий файл "final_result_sorted.txt" может быть выгружен на ETL сервер. Используя hadoop, нам удалось снизить объём данных для последующей обработки в базе данных с 5,997,840,132 до 79,403,204 записей, и результат уже правильно отсортирован.
Спасибо что зашли,
Будьте благословенны!
Денис