The cluster XSD specification is available here: A cluster contains different interfaces which are used by Falcon like readonly, write, workflow and messaging. A cluster is referenced by feeds and processes which are on-boarded to Falcon by its name.
Following are the tags defined in a cluster.xml:
<cluster colo="gs" description="" name="corp" xmlns="uri:falcon:cluster:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
The colo specifies the colo to which this cluster belongs to and name is the name of the cluster which has to be unique.
A cluster has varies interfaces as described below:
<interface type="readonly" endpoint="hftp://localhost:50010" version="0.20.2" />
A readonly interface specifies the endpoint for Hadoop's HFTP protocol, this would be used in the context of feed replication.
<interface type="write" endpoint="hdfs://localhost:8020" version="0.20.2" />
A write interface specifies the interface to write to hdfs, it's endpoint is the value of fs.default.name. Falcon uses this interface to write system data to hdfs and feeds referencing this cluster are written to hdfs using the same write interface.
<interface type="execute" endpoint="localhost:8021" version="0.20.2" />
An execute interface specifies the interface for job tracker, it's endpoint is the value of mapred.job.tracker. Falcon uses this interface to submit the processes as jobs on JobTracker defined here.
<interface type="workflow" endpoint="http://localhost:11000/oozie/" version="3.1" />
A workflow interface specifies the interface for workflow engine, example of its endpoint is the value for OOZIE_URL. Falcon uses this interface to schedule the processes referencing this cluster on workflow engine defined here.
<interface type="registry" endpoint="thrift://localhost:9083" version="0.11.0" />
A registry interface specifies the interface for metadata catalog, such as Hive Metastore (or HCatalog). Falcon uses this interface to register/de-register partitions for a given database and table. Also, uses this information to schedule data availability events based on partitions in the workflow engine. Although Hive metastore supports both RPC and HTTP, Falcon comes with an implementation for RPC over thrift.
<interface type="messaging" endpoint="tcp://localhost:61616?daemon=true" version="5.4.6" />
A messaging interface specifies the interface for sending feed availability messages, it's endpoint is broker url with tcp address.
A cluster has a list of locations defined:
<location name="staging" path="/projects/falcon/staging" />
Location has the name and the path, name is the type of locations like staging, temp and working. and path is the hdfs path for each location. Falcon would use the location to do intermediate processing of entities in hdfs and hence Falcon should have read/write/execute permission on these locations.
A cluster has a list of properties: A key-value pair, which are propagated to the workflow engine.
<property name="brokerImplClass" value="org.apache.activemq.ActiveMQConnectionFactory" />
Ideally JMS impl class name of messaging engine (brokerImplClass) should be defined here.
The Feed XSD specification is available here. a Feed defines various attributes of feed like feed location, frequency, late-arrival handling and retention policies. A feed can be scheduled on a cluster, once a feed is scheduled its retention and replication process are triggered in a given cluster.
<feed description="clicks log" name="clicks" xmlns="uri:falcon:feed:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
A feed should have a unique name and this name is referenced by processes as input or output feed.
Falcon introduces a new abstraction to encapsulate the storage for a given feed which can either be expressed as a path on the file system, File System Storage or a table in a catalog such as Hive, Catalog Storage.
<xs:choice minOccurs="1" maxOccurs="1"> <xs:element type="locations" name="locations"/> <xs:element type="catalog-table" name="table"/> </xs:choice>
Feed should contain one of the two storage options. Locations on File System or Table in a Catalog.
<clusters> <cluster name="test-cluster"> <validity start="2012-07-20T03:00Z" end="2099-07-16T00:00Z"/> <retention limit="days(10)" action="delete"/> <locations> <location type="data" path="/hdfsDataLocation/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}"/> <location type="stats" path="/projects/falcon/clicksStats" /> <location type="meta" path="/projects/falcon/clicksMetaData" /> </locations> </cluster> ..... more clusters </clusters>
Feed references a cluster by it's name, before submitting a feed all the referenced cluster should be submitted to Falcon. type: specifies whether the referenced cluster should be treated as a source or target for a feed. A feed can have multiple source and target clusters. If the type of cluster is not specified then the cluster is not considered for replication. Validity of a feed on cluster specifies duration for which this feed is valid on this cluster. Retention specifies how long the feed is retained on this cluster and the action to be taken on the feed after the expiry of retention period. The retention limit is specified by expression frequency(times), ex: if feed should be retained for at least 6 hours then retention's limit="hours(6)". The field partitionExp contains partition tags. Number of partition tags has to be equal to number of partitions specified in feed schema. A partition tag can be a wildcard(*), a static string or an expression. Atleast one of the strings has to be an expression. Location specifies where the feed is available on this cluster. This is an optional parameter and path can be same or different from the global locations tag value ( it is mentioned outside the clusters tag ) . This tag provides the user to flexibility to have feed at different locations on different clusters. If this attribute is missing then the default global location is picked from the feed definition. Also the individual location tags data, stats, meta are optional.
<location type="data" path="/projects/falcon/clicks" /> <location type="stats" path="/projects/falcon/clicksStats" /> <location type="meta" path="/projects/falcon/clicksMetaData" />
A location tag specifies the type of location like data, meta, stats and the corresponding paths for them. A feed should at least define the location for type data, which specifies the HDFS path pattern where the feed is generated periodically. ex: type="data" path="/projects/TrafficHourly/${YEAR}-${MONTH}-${DAY}/traffic" The granularity of date pattern in the path should be at least that of a frequency of a feed. Other location type which are supported are stats and meta paths, if a process references a feed then the meta and stats paths are available as a property in a process.
A table tag specifies the table URI in the catalog registry as:
catalog:$database-name:$table-name#partition-key=partition-value);partition-key=partition-value);*
This is modeled as a URI (similar to an ISBN URI). It does not have any reference to Hive or HCatalog. Its quite generic so it can be tied to other implementations of a catalog registry. The catalog implementation specified in the startup config provides implementation for the catalog URI.
Top-level partition has to be a dated pattern and the granularity of date pattern should be at least that of a frequency of a feed.
<xs:complexType name="catalog-table"> <xs:annotation> <xs:documentation> catalog specifies the uri of a Hive table along with the partition spec. uri="catalog:$database:$table#(partition-key=partition-value);+" Example: catalog:logs-db:clicks#ds=${YEAR}-${MONTH}-${DAY} </xs:documentation> </xs:annotation> <xs:attribute type="xs:string" name="uri" use="required"/> </xs:complexType>
Examples:
<table uri="catalog:default:clicks#ds=${YEAR}-${MONTH}-${DAY}-${HOUR};region=${region}" /> <table uri="catalog:src_demo_db:customer_raw#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" /> <table uri="catalog:tgt_demo_db:customer_bcp#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
<partitions> <partition name="country" /> <partition name="cluster" /> </partitions>
A feed can define multiple partitions, if a referenced cluster defines partitions then the number of partitions in feed has to be equal to or more than the cluster partitions.
Note: This will only apply for FileSystem storage but not Table storage as partitions are defined and maintained in Hive (HCatalog) registry.
<groups>online,bi</groups>
A feed specifies a list of comma separated groups, a group is a logical grouping of feeds and a group is said to be available if all the feeds belonging to a group are available. The frequency of all the feed which belong to the same group must be same.
<availabilityFlag>_SUCCESS</availabilityFlag>
An availabilityFlag specifies the name of a file which when present/created in a feeds data directory, the feed is termed as available. ex: _SUCCESS, if this element is ignored then Falcon would consider the presence of feed's data directory as feed availability.
<frequency>minutes(20)</frequency>
A feed has a frequency which specifies the frequency by which this feed is generated. ex: it can be generated every hour, every 5 minutes, daily, weekly etc. valid frequency type for a feed are minutes, hours, days, months. The values can be negative, zero or positive.
<late-arrival cut-off="hours(6)" />
A late-arrival specifies the cut-off period till which the feed is expected to arrive late and should be honored be processes referring to it as input feed by rerunning the instances in case the data arrives late with in a cut-off period. The cut-off period is specified by expression frequency(times), ex: if the feed can arrive late upto 8 hours then late-arrival's cut-off="hours(8)"
Note: This will only apply for FileSystem storage but not Table storage until a future time.
<properties> <property name="tmpFeedPath" value="tmpFeedPathValue" /> <property name="field2" value="value2" /> <property name="queueName" value="hadoopQueue"/> <property name="jobPriority" value="VERY_HIGH"/> <property name="timeout" value="hours(1)"/> <property name="parallel" value="3"/> <property name="maxMaps" value="8"/> <property name="mapBandwidthKB" value="1024"/> </properties>
A key-value pair, which are propagated to the workflow engine. "queueName" and "jobPriority" are special properties available to user to specify the Hadoop job queue and priority, the same value is used by Falcons launcher job. "timeout" and "parallel" are other special properties which decides replication instance's timeout value while waiting for the feed instance and parallel decides the concurrent replication instances that can run at any given time. "maxMaps" represents the maximum number of maps used during replication. "mapBandwidthKB" represents the bandwidth in KB/s used by each mapper during replication.
A process defines configuration for a workflow. A workflow is a directed acyclic graph(DAG) which defines the job for the workflow engine. A process definition defines the configurations required to run the workflow job. For example, process defines the frequency at which the workflow should run, the clusters on which the workflow should run, the inputs and outputs for the workflow, how the workflow failures should be handled, how the late inputs should be handled and so on.
The different details of process are:
Each process is identified with a unique name. Syntax:
<process name="[process name]"> ... </process>
The cluster on which the workflow should run. A process should contain one or more clusters. Cluster definition for the cluster name gives the end points for workflow execution, name node, job tracker, messaging and so on. Each cluster inturn has validity mentioned, which tell the times between which the job should run on that specified cluster. Syntax:
<process name="[process name]"> ... <clusters> <cluster name="test-cluster1"> <validity start="2012-12-21T08:15Z" end="2100-01-01T00:00Z"/> </cluster> <cluster name="test-cluster2"> <validity start="2012-12-21T08:15Z" end="2100-01-01T00:00Z"/> </cluster> .... .... </clusters> ... </process>
Parallel defines how many instances of the workflow can run concurrently. It should be a positive integer > 0. For example, parallel of 1 ensures that only one instance of the workflow can run at a time. The next instance will start only after the running instance completes. Syntax:
<process name="[process name]"> ... <parallel>[parallel]</parallel> ... </process>
Order defines the order in which the ready instances are picked up. The possible values are FIFO(First In First Out), LIFO(Last In First Out), and ONLYLAST(Last Only). Syntax:
<process name="[process name]"> ... <order>[order]</order> ... </process>
A optional Timeout specifies the maximum time an instance waits for a dataset before being killed by the workflow engine, a time out is specified like frequency. If timeout is not specified, falcon computes a default timeout for a process based on its frequency, which is six times of the frequency of process or 30 minutes if computed timeout is less than 30 minutes.
<process name="[process name]"> ... <timeout>[timeunit]([frequency])</timeout> ... </process>
Frequency defines how frequently the workflow job should run. For example, hours(1) defines the frequency as hourly, days(7) defines weekly frequency. The values for timeunit can be minutes/hours/days/months and the frequency number should be a positive integer > 0. Syntax:
<process name="[process name]"> ... <frequency>[timeunit]([frequency])</order> ... </process>
Validity defines how long the workflow should run. It has 3 components - start time, end time and timezone. Start time and end time are timestamps defined in yyyy-MM-dd'T'HH:mm'Z' format and should always be in UTC. Timezone is used to compute the next instances starting from start time. The workflow will start at start time and end before end time specified on a given cluster. So, there will not be a workflow instance at end time. Syntax:
<process name="[process name]"> ... <validity start=[start time] end=[end time] timezone=[timezone]/> ... </process>
Examples:
<process name="sample-process"> ... <frequency>days(1)</frequency> <validity start="2012-01-01T00:40Z" end="2012-04-01T00:00" timezone="UTC"/> ... </process>
The daily workflow will start on Jan 1st 2012 at 00:40 UTC, it will run at 40th minute of every hour and the last instance will be at March 31st 2012 at 23:40 UTC.
<process name="sample-process"> ... <frequency>hours(1)</frequency> <validity start="2012-03-11T08:40Z" end="2012-03-12T08:00" timezone="PST8PDT"/> ... </process>
The hourly workflow will start on March 11th 2012 at 00:40 PST, the next instances will be at 01:40 PST, 03:40 PDT, 04:40 PDT and so on till 23:40 PDT. So, there will be just 23 instances of the workflow for March 11th 2012 because of DST switch.
Inputs define the input data for the workflow. The workflow job will start executing only after the schedule time and when all the inputs are available. There can be 0 or more inputs and each of the input maps to a feed. The path and frequency of input data is picked up from feed definition. Each input should also define start and end instances in terms of EL expressions and can optionally specify specific partition of input that the workflow requires. The components in partition should be subset of partitions defined in the feed.
For each input, Falcon will create a property with the input name that contains the comma separated list of input paths. This property can be used in workflow actions like pig scripts and so on.
Syntax:
<process name="[process name]"> ... <inputs> <input name=[input name] feed=[feed name] start=[start el] end=[end el] partition=[partition]/> ... </inputs> ... </process>
Example:
<feed name="feed1"> ... <partition name="isFraud"/> <partition name="country"/> <frequency>hours(1)</frequency> <locations> <location type="data" path="/projects/bootcamp/feed1/${YEAR}-${MONTH}-${DAY}-${HOUR}"/> ... </locations> ... </feed> <process name="sample-process"> ... <inputs> <input name="input1" feed="feed1" start="today(0,0)" end="today(1,0)" partition="*/US"/> ... </inputs> ... </process>
The input for the workflow is a hourly feed and takes 0th and 1st hour data of today(the day when the workflow runs). If the workflow is running for 2012-03-01T06:40Z, the inputs are /projects/bootcamp/feed1/2012-03-01-00/*/US and /projects/bootcamp/feed1/2012-03-01-01/*/US. The property for this input is input1=/projects/bootcamp/feed1/2012-03-01-00/*/US,/projects/bootcamp/feed1/2012-03-01-01/*/US
Also, feeds with Hive table storage can be used as inputs to a process. Several parameters from inputs are passed as params to the user workflow or pig script.
${wf:conf('falcon_input_database')} - database name associated with the feed for a given input ${wf:conf('falcon_input_table')} - table name associated with the feed for a given input ${wf:conf('falcon_input_catalog_url')} - Hive metastore URI for this input feed ${wf:conf('falcon_input_partition_filter_pig')} - value of ${coord:dataInPartitionFilter('$input', 'pig')} ${wf:conf('falcon_input_partition_filter_hive')} - value of ${coord:dataInPartitionFilter('$input', 'hive')} ${wf:conf('falcon_input_partition_filter_java')} - value of ${coord:dataInPartitionFilter('$input', 'java')}
NOTE: input is the name of the input configured in the process, which is input.getName().
<input name="input" feed="clicks-raw-table" start="yesterday(0,0)" end="yesterday(20,0)"/>
Example workflow configuration:
<configuration> <property> <name>falcon_input_database</name> <value>falcon_db</value> </property> <property> <name>falcon_input_table</name> <value>input_table</value> </property> <property> <name>falcon_input_catalog_url</name> <value>thrift://localhost:29083</value> </property> <property> <name>falcon_input_storage_type</name> <value>TABLE</value> </property> <property> <name>feedInstancePaths</name> <value>hcat://localhost:29083/falcon_db/output_table/ds=2012-04-21-00</value> </property> <property> <name>falcon_input_partition_filter_java</name> <value>(ds='2012-04-21-00')</value> </property> <property> <name>falcon_input_partition_filter_hive</name> <value>(ds='2012-04-21-00')</value> </property> <property> <name>falcon_input_partition_filter_pig</name> <value>(ds=='2012-04-21-00')</value> </property> ... </configuration>
User can mention one or more inputs as optional inputs. In such cases the job does not wait on those inputs which are mentioned as optional. If they are present it considers them otherwise continue with the compulsory ones. Example:
<feed name="feed1"> ... <partition name="isFraud"/> <partition name="country"/> <frequency>hours(1)</frequency> <locations> <location type="data" path="/projects/bootcamp/feed1/${YEAR}-${MONTH}-${DAY}-${HOUR}"/> ... </locations> ... </feed> <process name="sample-process"> ... <inputs> <input name="input1" feed="feed1" start="today(0,0)" end="today(1,0)" partition="*/US"/> <input name="input2" feed="feed2" start="today(0,0)" end="today(1,0)" partition="*/UK" optional="true" /> ... </inputs> ... </process>
Note: This is only supported for FileSystem storage but not Table storage at this point.
Outputs define the output data that is generated by the workflow. A process can define 0 or more outputs. Each output is mapped to a feed and the output path is picked up from feed definition. The output instance that should be generated is specified in terms of EL expression.
For each output, Falcon creates a property with output name that contains the path of output data. This can be used in workflows to store in the path. Syntax:
<process name="[process name]"> ... <outputs> <output name=[input name] feed=[feed name] instance=[instance el]/> ... </outputs> ... </process>
Example:
<feed name="feed2"> ... <frequency>days(1)</frequency> <locations> <location type="data" path="/projects/bootcamp/feed2/${YEAR}-${MONTH}-${DAY}"/> ... </locations> ... </feed> <process name="sample-process"> ... <outputs> <output name="output1" feed="feed2" instance="today(0,0)"/> ... </outputs> ... </process>
The output of the workflow is feed instance for today. If the workflow is running for 2012-03-01T06:40Z, the workflow generates output /projects/bootcamp/feed2/2012-03-01. The property for this output that is available for workflow is: output1=/projects/bootcamp/feed2/2012-03-01
Also, feeds with Hive table storage can be used as outputs to a process. Several parameters from outputs are passed as params to the user workflow or pig script.
${wf:conf('falcon_output_database')} - database name associated with the feed for a given output ${wf:conf('falcon_output_table')} - table name associated with the feed for a given output ${wf:conf('falcon_output_catalog_url')} - Hive metastore URI for the given output feed ${wf:conf('falcon_output_dataout_partitions')} - value of ${coord:dataOutPartitions('$output')}
NOTE: output is the name of the output configured in the process, which is output.getName().
<output name="output" feed="clicks-summary-table" instance="today(0,0)"/>
Example workflow configuration:
<configuration> <property> <name>falcon_output_database</name> <value>falcon_db</value> </property> <property> <name>falcon_output_table</name> <value>output_table</value> </property> <property> <name>falcon_output_catalog_url</name> <value>thrift://localhost:29083</value> </property> <property> <name>falcon_output_storage_type</name> <value>TABLE</value> </property> <property> <name>feedInstancePaths</name> <value>hcat://localhost:29083/falcon_db/output_table/ds=2012-04-21-00</value> </property> <property> <name>falcon_output_dataout_partitions</name> <value>'ds=2012-04-21-00'</value> </property> .... </configuration>
The properties are key value pairs that are passed to the workflow. These properties are optional and can be used in workflow to parameterize the workflow. Syntax:
<process name="[process name]"> ... <properties> <property name=[key] value=[value]/> ... </properties> ... </process>
queueName and jobPriority are special properties, which when present are used by the Falcon's launcher job, the same property is also available in workflow which can be used to propagate to pig or M/R job.
<property name="queueName" value="hadoopQueue"/> <property name="jobPriority" value="VERY_HIGH"/>
The workflow defines the workflow engine that should be used and the path to the workflow on hdfs. The workflow definition on hdfs contains the actual job that should run and it should confirm to the workflow specification of the engine specified. The libraries required by the workflow should be in lib folder inside the workflow path.
The properties defined in the cluster and cluster properties(nameNode and jobTracker) will also be available for the workflow.
There are 2 engines supported today.
As part of oozie workflow engine support, users can embed a oozie workflow. Refer to oozie workflow overview and workflow specification for details.
Syntax:
<process name="[process name]"> ... <workflow engine=[workflow engine] path=[workflow path]/> ... </process>
Example:
<process name="sample-process"> ... <workflow engine="oozie" path="/projects/bootcamp/workflow"/> ... </process>
This defines the workflow engine to be oozie and the workflow xml is defined at /projects/bootcamp/workflow/workflow.xml. The libraries are at /projects/bootcamp/workflow/lib.
Falcon also adds the Pig engine which enables users to embed a Pig script as a process.
Example:
<process name="sample-process"> ... <workflow engine="pig" path="/projects/bootcamp/pig.script"/> ... </process>
This defines the workflow engine to be pig and the pig script is defined at /projects/bootcamp/pig.script.
Feeds with Hive table storage will send one more parameter apart from the general ones:
$input_filter
Falcon also adds the Hive engine as part of Hive Integration which enables users to embed a Hive script as a process. This would enable users to create materialized queries in a declarative way.
Example:
<process name="sample-process"> ... <workflow engine="hive" path="/projects/bootcamp/hive-script.hql"/> ... </process>
This defines the workflow engine to be hive and the hive script is defined at /projects/bootcamp/hive-script.hql.
Feeds with Hive table storage will send one more parameter apart from the general ones:
$input_filter
Retry policy defines how the workflow failures should be handled. Two retry policies are defined: backoff and exp-backoff(exponential backoff). Depending on the delay and number of attempts, the workflow is re-tried after specific intervals. Syntax:
<process name="[process name]"> ... <retry policy=[retry policy] delay=[retry delay] attempts=[retry attempts]/> ... </process>
Examples:
<process name="sample-process"> ... <retry policy="backoff" delay="minutes(10)" attempts="3"/> ... </process>
The workflow is re-tried after 10 mins, 20 mins and 30 mins. With exponential backoff, the workflow will be re-tried after 10 mins, 20 mins and 40 mins.
Late data handling defines how the late data should be handled. Each feed is defined with a late cut-off value which specifies the time till which late data is valid. For example, late cut-off of hours(6) means that data for nth hour can get delayed by upto 6 hours. Late data specification in process defines how this late data is handled.
Late data policy defines how frequently check is done to detect late data. The policies supported are: backoff, exp-backoff(exponention backoff) and final(at feed's late cut-off). The policy along with delay defines the interval at which late data check is done.
Late input specification for each input defines the workflow that should run when late data is detected for that input.
Syntax:
<process name="[process name]"> ... <late-process policy=[late handling policy] delay=[delay]> <late-input input=[input name] workflow-path=[workflow path]/> ... </late-process> ... </process>
Example:
<feed name="feed1"> ... <frequency>hours(1)</frequency> <late-arrival cut-off="hours(6)"/> ... </feed> <process name="sample-process"> ... <inputs> <input name="input1" feed="feed1" start="today(0,0)" end="today(1,0)"/> ... </inputs> <late-process policy="final"> <late-input input="input1" workflow-path="/projects/bootcamp/workflow/lateinput1" /> ... </late-process> ... </process>
This late handling specifies that late data detection should run at feed's late cut-off which is 6 hours in this case. If there is late data, Falcon should run the workflow specified at /projects/bootcamp/workflow/lateinput1/workflow.xml
Note: This is only supported for FileSystem storage but not Table storage at this point.