Falcon provides constructs to periodically bring raw data from external data sources (like databases, drop boxes etc) onto Hadoop and push derived data computed on Hadoop onto external data sources.
As of this release, Falcon only supports Relational Databases (e.g. Oracle, MySQL etc) via JDBC as external data source. The future releases will add support for other external data sources.
Following are the prerequisites to import external data from and export to databases.
Note: Falcon uses Sqoop for import/export operation. Sqoop will require appropriate database driver to connect to the relational database. Please refer to the Sqoop documentation for any Sqoop related question. Please make sure the database driver jar is copied into oozie share lib for Sqoop.
For example, in order to import and export with MySQL, please make sure the latest MySQL connector *mysql-connector-java-5.1.31.jar+* is copied into oozie's Sqoop share lib /user/oozie/share/lib/{lib-dir}/sqoop/mysql-connector-java-5.1.31.jar+ where {lib-dir} value varies in oozie deployments.
The following example defines a Datasource entity for a MySQL database. The import operation will use the read interface with url "jdbc:mysql://dbhost/test", user name "import_usr" and password text "sqoop". Where as, the export operation will use the write interface with url "jdbc:mysql://dbhost/test" with user name "export_usr" and password specified in a HDFS file at the location "/user/ambari-qa/password-store/password_write_user".
The default credential specified will be used if either the read or write interface does not provide its own credentials. The default credential specifies the password using password alias feature available via hadoop credential functionality. User will be able to create a password alias using "hadoop credential -create <alias> -provider <provider-path>" command, where <alias> is a string and <provider-path> is a HDFS jceks file. During runtime, the specified alias will be used to look up the password stored encrypted in the jceks hdfs file specified under the providerPath element.
The available read and write interfaces enable database administrators to segregate read and write workloads.
File: mysql-database.xml <?xml version="1.0" encoding="UTF-8"?> <datasource colo="west-coast" description="MySQL database on west coast" type="mysql" name="mysql-db" xmlns="uri:falcon:datasource:0.1"> <tags>owner=foobar@ambari.apache.org, consumer=phoe@ambari.apache.org</tags> <interfaces> <!-- ***** read interface ***** --> <interface type="readonly" endpoint="jdbc:mysql://dbhost/test"> <credential type="password-text"> <userName>import_usr</userName> <passwordText>sqoop</passwordFile> </credential> </interface> <!-- ***** write interface ***** --> <interface type="write" endpoint="jdbc:mysql://dbhost/test"> <credential type="password-file"> <userName>export_usr</userName> <passwordFile>/user/ambari-qa/password-store/password_write_user</passwordFile> </credential> </interface> <!-- *** default credential *** --> <credential type="password-alias"> <userName>sqoop2_user</userName> <passwordAlias> <alias>sqoop.password.alias</alias> <providerPath>hdfs://namenode:8020/user/ambari-qa/sqoop_password.jceks</providerPath> </passwordAlias> </credential> </interfaces> <driver> <clazz>com.mysql.jdbc.Driver</clazz> <jar>/user/oozie/share/lib/lib_20150721010816/sqoop/mysql-connector-java-5.1.31</jar> </driver> </datasource>
The following example defines a Feed entity with IMPORT and EXPORT policies. Both the IMPORT and EXPORT operations refer to a datasource entity "mysql-db". The IMPORT operation will use the read interface and credentials while the EXPORT operation will use the write interface and credentials. A feed instance is created every 1 hour since the frequency of the Feed is hour(1) and the Feed instances are deleted after 90 days because of the retention policy.
File: customer_email_feed.xml <?xml version="1.0" encoding="UTF-8"?> <!-- A feed representing Hourly customer email data retained for 90 days --> <feed description="Raw customer email feed" name="customer_feed" xmlns="uri:falcon:feed:0.1"> <tags>externalSystem=USWestEmailServers,classification=secure</tags> <groups>DataImportPipeline</groups> <frequency>hours(1)</frequency> <late-arrival cut-off="hours(4)"/> <clusters> <cluster name="primaryCluster" type="source"> <validity start="2015-12-15T00:00Z" end="2016-03-31T00:00Z"/> <retention limit="days(90)" action="delete"/> <import> <source name="mysql-db" tableName="simple"> <extract type="full"> <mergepolicy>snapshot</mergepolicy> </extract> <fields> <includes> <field>id</field> <field>name</field> </includes> </fields> </source> <arguments> <argument name="--split-by" value="id"/> <argument name="--num-mappers" value="2"/> </arguments> </import> <export> <target name="mysql-db" tableName="simple_export"> <load type="insert"/> <fields> <includes> <field>id</field> <field>name</field> </includes> </fields> </target> <arguments> <argument name="--update-key" value="id"/> </arguments> </export> </cluster> </clusters> <locations> <location type="data" path="/user/ambari-qa/falcon/demo/primary/importfeed/${YEAR}-${MONTH}-${DAY}-${HOUR}-${MINUTE}"/> <location type="stats" path="/none"/> <location type="meta" path="/none"/> </locations> <ACL owner="ambari-qa" group="users" permission="0755"/> <schema location="/none" provider="none"/> </feed>
Extraction type specifies whether to pull data from external datasource "full" everytime or "incrementally". The mergepolicy specifies how to organize (snapshot or append, i.e time series partiitons) the data on hadoop. The valid combinations are:
The following example defines an incremental extraction with append organization:
<import> <source name="mysql-db" tableName="simple"> <extract type="incremental"> <deltacolumn>modified_time</deltacolumn> <mergepolicy>append</mergepolicy> </extract> <fields> <includes> <field>id</field> <field>name</field> </includes> </fields> </source> <arguments> <argument name="--split-by" value="id"/> <argument name="--num-mappers" value="2"/> </arguments> </import>
The fields option enables to control what fields get imported. By default, all fields get import. The "includes" option brings only those fields specified. The "excludes" option brings all the fields other than specified.
The arguments section enables to pass in any extra arguments needed for fine control on the underlying implementation -- in this case, Sqoop.
Once the Datasource and Feed entity with import and export policies are defined, Users can submit and schedule the Import and Export operations via CLI and REST API as below:
## submit the mysql-db datasource defined in the file mysql_datasource.xml falcon entity -submit -type datasource -file mysql_datasource.xml ## submit the customer_feed specified in the customer_email_feed.xml falcon entity -submit -type feed -file customer_email_feed.xml ## schedule the customer_feed falcon entity -schedule -type feed -name customer_feed
Falcon will create corresponding oozie bundles with coordinator and workflow for import and export operation.