aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Shaposhnik <rvs@cloudera.com>2013-04-28 22:19:02 -0700
committerRoman Shaposhnik <rvs@cloudera.com>2013-04-28 23:15:42 -0700
commit41213f9816e7cc33359b35d61de13ec0a3bce1a8 (patch)
tree5927c5516d0b91f23547dcfec47d025d4b04bac0
parent2d645244b5900649bb3a91f99fd115639009ff08 (diff)
downloadbigtop-odpi-41213f9816e7cc33359b35d61de13ec0a3bce1a8.tar.gz
BIGTOP-728. add datafu integration test (Johnny Zhang via rvs)
-rw-r--r--bigtop-tests/test-artifacts/datafu/pom.xml48
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/groovy/java/datafu/linkanalysis/PageRank.java441
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/groovy/java/datafu/pig/linkanalysis/PageRank.java372
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/PigTests.java193
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/bags/BagTests.java290
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/bags/sets/SetTests.java56
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/date/TimeTests.java47
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/geo/GeoTests.java57
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/hash/HashTests.java45
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/linkanalysis/PageRankTest.java281
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/linkanalysis/PageRankTests.java102
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/numbers/NumberTests.java47
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/sessions/SessionTests.java74
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/stats/MarkovPairTests.java87
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/stats/QuantileTests.java178
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/stats/WilsonBinConfTests.java63
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/urls/UserAgentTest.java39
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/util/AssertTests.java75
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/util/IntBoolConversionPigTests.java59
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/aliasBagFieldsTest.pig20
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/appendToBagTest.pig9
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/bagConcatTest.pig11
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/bagSplitTest.pig14
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/bagSplitWithBagNumTest.pig11
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/comprehensiveBagSplitAndEnumerate.pig26
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/distinctByTest.pig12
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/enumerateTest.pig16
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/enumerateWithReverseTest.pig16
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/enumerateWithStartTest.pig16
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/firstTupleFromBagTest.pig9
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/nullToEmptyBagTest.pig14
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/prependToBagTest.pig9
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/sets/setIntersectTest.pig9
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/sets/setUnionTest.pig13
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/unorderedPairsTests.pig16
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/unorderedPairsTests2.pig12
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/date/timeCountPageViewsTest.pig13
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/geo/haversineTest.pig9
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/hash/md5Base64Test.pig9
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/hash/md5Test.pig9
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/linkanalysis/pageRankTest.pig25
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/numbers/randomIntRangeTest.pig8
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/sessions/sessionizeTest.pig17
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/markovPairDefault.pig14
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/markovPairLookahead.pig14
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/medianTest.pig21
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/quantileTest.pig21
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/streamingMedianTest.pig21
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/streamingQuantileTest.pig18
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/wilsonBinConfTests.pig11
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/urls/userAgentTest.pig8
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/util/assertWithMessageTest.pig10
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/util/assertWithoutMessageTest.pig10
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/util/intToBoolTest.pig10
-rw-r--r--bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/util/intToBoolToIntTest.pig12
-rw-r--r--bigtop-tests/test-artifacts/pom.xml1
-rw-r--r--bigtop-tests/test-execution/smokes/datafu/pom.xml123
-rw-r--r--pom.xml3
58 files changed, 3171 insertions, 3 deletions
diff --git a/bigtop-tests/test-artifacts/datafu/pom.xml b/bigtop-tests/test-artifacts/datafu/pom.xml
new file mode 100644
index 00000000..04f5a5b1
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/pom.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <groupId>org.apache.bigtop.itest</groupId>
+ <artifactId>bigtop-smokes</artifactId>
+ <version>0.6.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.bigtop.itest</groupId>
+ <artifactId>datafu-smoke</artifactId>
+ <version>0.6.0-SNAPSHOT</version>
+ <name>datafusmoke</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pig</groupId>
+ <artifactId>pig</artifactId>
+ <version>0.11.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pig</groupId>
+ <artifactId>pigunit</artifactId>
+ <version>0.11.1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>r06</version>
+ </dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>1.6</version>
+ </dependency>
+ <dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil</artifactId>
+ <version>6.3</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/groovy/java/datafu/linkanalysis/PageRank.java b/bigtop-tests/test-artifacts/datafu/src/main/groovy/java/datafu/linkanalysis/PageRank.java
new file mode 100644
index 00000000..2cadcf92
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/groovy/java/datafu/linkanalysis/PageRank.java
@@ -0,0 +1,441 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package datafu.linkanalysis;
+
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+import com.google.common.collect.AbstractIterator;
+
+/**
+ * An implementation of {@link <a href="http://en.wikipedia.org/wiki/PageRank" target="_blank">PageRank</a>}.
+ * This implementation is not distributed. It is intended for graphs of a reasonable size which can be processed
+ * on a single machine. Nodes are stored in memory. Edges are stored in memory and can optionally be spilled to
+ * disk once a certain limit is reached.
+ */
+public class PageRank
+{
+ private float totalRankChange;
+ private long edgeCount;
+ private long nodeCount;
+
+ // the damping factor
+ private static float ALPHA = 0.85f;
+
+ // edge weights (which are doubles) are multiplied by this value so they can be stored as integers internally
+ private static float EDGE_WEIGHT_MULTIPLIER = 100000;
+
+ private final Int2IntOpenHashMap nodeIndices = new Int2IntOpenHashMap();
+ private final FloatArrayList nodeData = new FloatArrayList(); // rank, total weight, contribution, (repeat)
+
+ private final IntArrayList danglingNodes = new IntArrayList();
+
+ private final IntArrayList edges = new IntArrayList(); // source, dest node count... dest id, weight pos, (repeat)
+
+ private boolean shouldHandleDanglingNodes = false;
+ private boolean shouldCacheEdgesOnDisk = false;
+ private long edgeCachingThreshold;
+
+ private File edgesFile;
+ private DataOutputStream edgeDataOutputStream;
+ private boolean usingEdgeDiskCache;
+
+ public interface ProgressIndicator
+ {
+ void progress();
+ }
+
+ public void clear() throws IOException
+ {
+ this.edgeCount = 0;
+ this.nodeCount = 0;
+ this.totalRankChange = 0.0f;
+
+ this.nodeIndices.clear();
+ this.nodeData.clear();
+ this.edges.clear();
+ this.danglingNodes.clear();
+
+ if (edgeDataOutputStream != null)
+ {
+ this.edgeDataOutputStream.close();
+ this.edgeDataOutputStream = null;
+ }
+
+ this.usingEdgeDiskCache = false;
+ this.edgesFile = null;
+ }
+
+ /**
+ * Gets whether disk is being used to cache edges.
+ * @return True if the edges are cached on disk.
+ */
+ public boolean isUsingEdgeDiskCache()
+ {
+ return usingEdgeDiskCache;
+ }
+
+ /**
+ * Enable disk caching of edges once there are too many (disabled by default).
+ */
+ public void enableEdgeDiskCaching()
+ {
+ shouldCacheEdgesOnDisk = true;
+ }
+
+ /**
+ * Disable disk caching of edges once there are too many (disabled by default).
+ */
+ public void disableEdgeDiskCaching()
+ {
+ shouldCacheEdgesOnDisk = false;
+ }
+
+ /**
+ * Gets whether edge disk caching is enabled.
+ * @return True if edge disk caching is enabled.
+ */
+ public boolean isEdgeDiskCachingEnabled()
+ {
+ return shouldCacheEdgesOnDisk;
+ }
+
+ /**
+ * Gets the number of edges past which they will be cached on disk instead of in memory.
+ * Edge disk caching must be enabled for this to have any effect.
+ * @return Edge count past which caching occurs
+ */
+ public long getEdgeCachingThreshold()
+ {
+ return edgeCachingThreshold;
+ }
+
+ /**
+ * Set the number of edges past which they will be cached on disk instead of in memory.
+ * Edge disk caching must be enabled for this to have any effect.
+ * @param count Edge count past which caching occurs
+ */
+ public void setEdgeCachingThreshold(long count)
+ {
+ edgeCachingThreshold = count;
+ }
+
+ /**
+ * Enables dangling node handling (disabled by default).
+ */
+ public void enableDanglingNodeHandling()
+ {
+ shouldHandleDanglingNodes = true;
+ }
+
+ /**
+ * Disables dangling node handling (disabled by default).
+ */
+ public void disableDanglingNodeHandling()
+ {
+ shouldHandleDanglingNodes = false;
+ }
+
+ public long nodeCount()
+ {
+ return this.nodeCount;
+ }
+
+ public long edgeCount()
+ {
+ return this.edgeCount;
+ }
+
+ public Int2IntMap.FastEntrySet getNodeIds()
+ {
+ return this.nodeIndices.int2IntEntrySet();
+ }
+
+ public float getNodeRank(int nodeId)
+ {
+ int nodeIndex = this.nodeIndices.get(nodeId);
+ return nodeData.get(nodeIndex);
+ }
+
+ public float getTotalRankChange()
+ {
+ return this.totalRankChange;
+ }
+
+ private void maybeCreateNode(int nodeId)
+ {
+ // create from node if it doesn't already exist
+ if (!nodeIndices.containsKey(nodeId))
+ {
+ int index = this.nodeData.size();
+
+ this.nodeData.add(0.0f); // rank
+ this.nodeData.add(0.0f); // total weight
+ this.nodeData.add(0.0f); // contribution
+
+ this.nodeIndices.put(nodeId, index);
+
+ this.nodeCount++;
+ }
+ }
+
+ public void addEdges(Integer sourceId, ArrayList<Map<String,Object>> sourceEdges) throws IOException
+ {
+ int source = sourceId.intValue();
+
+ maybeCreateNode(source);
+
+ if (this.shouldCacheEdgesOnDisk && !usingEdgeDiskCache && (sourceEdges.size() + this.edgeCount) >= this.edgeCachingThreshold)
+ {
+ writeEdgesToDisk();
+ }
+
+ // store the source node id itself
+ appendEdgeData(source);
+
+ // store how many outgoing edges this node has
+ appendEdgeData(sourceEdges.size());
+
+ // store the outgoing edges
+ for (Map<String,Object> edge : sourceEdges)
+ {
+ int dest = ((Integer)edge.get("dest")).intValue();
+ float weight = ((Double)edge.get("weight")).floatValue();
+
+ maybeCreateNode(dest);
+
+ appendEdgeData(dest);
+
+ // location of weight in weights array
+ appendEdgeData(Math.max(1, (int)(weight * EDGE_WEIGHT_MULTIPLIER)));
+
+ this.edgeCount++;
+ }
+ }
+
+ private void appendEdgeData(int data) throws IOException
+ {
+ if (this.edgeDataOutputStream != null)
+ {
+ this.edgeDataOutputStream.writeInt(data);
+ }
+ else
+ {
+ this.edges.add(data);
+ }
+ }
+
+ public void init(ProgressIndicator progressIndicator) throws IOException
+ {
+ if (this.edgeDataOutputStream != null)
+ {
+ this.edgeDataOutputStream.close();
+ this.edgeDataOutputStream = null;
+ }
+
+ // initialize all nodes to an equal share of the total rank (1.0)
+ float nodeRank = 1.0f / this.nodeCount;
+ for (int j=0; j<this.nodeData.size(); j+=3)
+ {
+ nodeData.set(j, nodeRank);
+ progressIndicator.progress();
+ }
+
+ Iterator<Integer> edgeData = getEdgeData();
+
+ while(edgeData.hasNext())
+ {
+ int sourceId = edgeData.next();
+ int nodeEdgeCount = edgeData.next();
+
+ while (nodeEdgeCount-- > 0)
+ {
+ // skip the destination node id
+ edgeData.next();
+
+ float weight = edgeData.next();
+
+ int nodeIndex = this.nodeIndices.get(sourceId);
+
+ float totalWeight = this.nodeData.getFloat(nodeIndex+1);
+ totalWeight += weight;
+ this.nodeData.set(nodeIndex+1, totalWeight);
+
+ progressIndicator.progress();
+ }
+ }
+
+ // if handling dangling nodes, get a list of them by finding those nodes with no outgoing
+ // edges (i.e. total outgoing edge weight is 0.0)
+ if (shouldHandleDanglingNodes)
+ {
+ for (Map.Entry<Integer,Integer> e : nodeIndices.entrySet())
+ {
+ int nodeId = e.getKey();
+ int nodeIndex = e.getValue();
+ float totalWeight = nodeData.getFloat(nodeIndex+1);
+ if (totalWeight == 0.0f)
+ {
+ danglingNodes.add(nodeId);
+ }
+ }
+ }
+ }
+
+ public float nextIteration(ProgressIndicator progressIndicator) throws IOException
+ {
+ distribute(progressIndicator);
+ commit(progressIndicator);
+
+ return getTotalRankChange();
+ }
+
+ public void distribute(ProgressIndicator progressIndicator) throws IOException
+ {
+ Iterator<Integer> edgeData = getEdgeData();
+
+ while(edgeData.hasNext())
+ {
+ int sourceId = edgeData.next();
+ int nodeEdgeCount = edgeData.next();
+
+ while (nodeEdgeCount-- > 0)
+ {
+ int toId = edgeData.next();
+ float weight = edgeData.next();
+
+ int fromNodeIndex = this.nodeIndices.get(sourceId);
+ int toNodeIndex = this.nodeIndices.get(toId);
+
+ float contributionChange = weight * this.nodeData.getFloat(fromNodeIndex) / this.nodeData.getFloat(fromNodeIndex+1);
+
+ float currentContribution = this.nodeData.getFloat(toNodeIndex+2);
+ this.nodeData.set(toNodeIndex+2, currentContribution + contributionChange);
+
+ progressIndicator.progress();
+ }
+ }
+
+ if (shouldHandleDanglingNodes)
+ {
+ // get the rank from each of the dangling nodes
+ float totalRank = 0.0f;
+ for (int nodeId : danglingNodes)
+ {
+ int nodeIndex = nodeIndices.get(nodeId);
+ float rank = nodeData.get(nodeIndex);
+ totalRank += rank;
+ }
+
+ // distribute the dangling node ranks to all the nodes in the graph
+ // note: the alpha factor is applied in the commit stage
+ float contributionIncrease = totalRank / this.nodeCount;
+ for (int i=2; i<nodeData.size(); i += 3)
+ {
+ float contribution = nodeData.getFloat(i);
+ contribution += contributionIncrease;
+ nodeData.set(i, contribution);
+ }
+ }
+ }
+
+ public void commit(ProgressIndicator progressIndicator)
+ {
+ this.totalRankChange = 0.0f;
+
+ for (int id : nodeIndices.keySet())
+ {
+ int nodeIndex = this.nodeIndices.get(id);
+
+ float alpha = datafu.linkanalysis.PageRank.ALPHA;
+ float newRank = (1.0f - alpha)/nodeCount + alpha * this.nodeData.get(nodeIndex+2);
+
+ this.nodeData.set(nodeIndex+2, 0.0f);
+
+ float lastRankDiff = newRank - this.nodeData.get(nodeIndex);
+
+ this.nodeData.set(nodeIndex, newRank);
+
+ this.totalRankChange += Math.abs(lastRankDiff);
+
+ progressIndicator.progress();
+ }
+ }
+
+ private void writeEdgesToDisk() throws IOException
+ {
+ this.edgesFile = File.createTempFile("fastgraph", null);
+
+ FileOutputStream outStream = new FileOutputStream(this.edgesFile);
+ BufferedOutputStream bufferedStream = new BufferedOutputStream(outStream);
+ this.edgeDataOutputStream = new DataOutputStream(bufferedStream);
+
+ for (int edgeData : edges)
+ {
+ this.edgeDataOutputStream.writeInt(edgeData);
+ }
+
+ this.edges.clear();
+ usingEdgeDiskCache = true;
+ }
+
+ private Iterator<Integer> getEdgeData() throws IOException
+ {
+ if (!usingEdgeDiskCache)
+ {
+ return this.edges.iterator();
+ }
+ else
+ {
+ FileInputStream fileInputStream = new FileInputStream(this.edgesFile);
+ BufferedInputStream inputStream = new BufferedInputStream(fileInputStream);
+ final DataInputStream dataInputStream = new DataInputStream(inputStream);
+
+ return new AbstractIterator<Integer>() {
+
+ @Override
+ protected Integer computeNext()
+ {
+ try
+ {
+ return dataInputStream.readInt();
+ }
+ catch (IOException e)
+ {
+ return endOfData();
+ }
+ }
+
+ };
+ }
+ }
+}
+
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/groovy/java/datafu/pig/linkanalysis/PageRank.java b/bigtop-tests/test-artifacts/datafu/src/main/groovy/java/datafu/pig/linkanalysis/PageRank.java
new file mode 100644
index 00000000..2460fc2f
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/groovy/java/datafu/pig/linkanalysis/PageRank.java
@@ -0,0 +1,372 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package datafu.pig.linkanalysis;
+
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import datafu.linkanalysis.PageRank.ProgressIndicator;
+
+
+/**
+ * A UDF which implements {@link <a href="http://en.wikipedia.org/wiki/PageRank" target="_blank">PageRank</a>}.
+ * Each graph is stored in memory while running the algorithm, with edges optionally
+ * spilled to disk to conserve memory. This can be used to distribute the execution of PageRank on a large number of
+ * reasonable sized graphs. It does not distribute execuion of PageRank on a single graph. Each graph is identified
+ * by an integer valued topic ID.
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ *
+ * topic_edges = LOAD 'input_edges' as (topic:INT,source:INT,dest:INT,weight:DOUBLE);
+ *
+ * topic_edges_grouped = GROUP topic_edges by (topic, source) ;
+ * topic_edges_grouped = FOREACH topic_edges_grouped GENERATE
+ * group.topic as topic,
+ * group.source as source,
+ * topic_edges.(dest,weight) as edges;
+ *
+ * topic_edges_grouped_by_topic = GROUP topic_edges_grouped BY topic;
+ *
+ * topic_ranks = FOREACH topic_edges_grouped_by_topic GENERATE
+ * group as topic,
+ * FLATTEN(PageRank(topic_edges_grouped.(source,edges))) as (source,rank);
+ *
+ * skill_ranks = FOREACH skill_ranks GENERATE
+ * topic, source, rank;
+ *
+ * }
+ * </pre>
+ */
+public class PageRank extends EvalFunc<DataBag> implements Accumulator<DataBag>
+{
+ private final datafu.linkanalysis.PageRank graph = new datafu.linkanalysis.PageRank();
+
+ private int maxNodesAndEdges = 100000000;
+ private int maxEdgesInMemory = 30000000;
+ private double tolerance = 1e-16;
+ private int maxIters = 150;
+ private boolean useEdgeDiskStorage = false;
+ private boolean enableDanglingNodeHandling = false;
+ private boolean aborted = false;
+
+ TupleFactory tupleFactory = TupleFactory.getInstance();
+ BagFactory bagFactory = BagFactory.getInstance();
+
+ public PageRank()
+ {
+ initialize();
+ }
+
+ public PageRank(String... parameters)
+ {
+ if (parameters.length % 2 != 0)
+ {
+ throw new RuntimeException("Invalid parameters list");
+ }
+
+ for (int i=0; i<parameters.length; i+=2)
+ {
+ String parameterName = parameters[i];
+ String value = parameters[i+1];
+ if (parameterName.equals("max_nodes_and_edges"))
+ {
+ maxNodesAndEdges = Integer.parseInt(value);
+ }
+ else if (parameterName.equals("max_edges_in_memory"))
+ {
+ maxEdgesInMemory = Integer.parseInt(value);
+ }
+ else if (parameterName.equals("tolerance"))
+ {
+ tolerance = Double.parseDouble(value);
+ }
+ else if (parameterName.equals("max_iters"))
+ {
+ maxIters = Integer.parseInt(value);
+ }
+ else if (parameterName.equals("spill_to_edge_disk_storage"))
+ {
+ useEdgeDiskStorage = Boolean.parseBoolean(value);
+ }
+ else if (parameterName.equals("dangling_nodes"))
+ {
+ enableDanglingNodeHandling = Boolean.parseBoolean(value);
+ }
+ }
+
+ initialize();
+ }
+
+ private void initialize()
+ {
+ long heapSize = Runtime.getRuntime().totalMemory();
+ long heapMaxSize = Runtime.getRuntime().maxMemory();
+ long heapFreeSize = Runtime.getRuntime().freeMemory();
+// System.out.println(String.format("Heap size: %d, Max heap size: %d, Heap free size: %d", heapSize, heapMaxSize, heapFreeSize));
+
+ if (useEdgeDiskStorage)
+ {
+ this.graph.enableEdgeDiskCaching();
+ }
+ else
+ {
+ this.graph.disableEdgeDiskCaching();
+ }
+
+ if (enableDanglingNodeHandling)
+ {
+ this.graph.enableDanglingNodeHandling();
+ }
+ else
+ {
+ this.graph.disableDanglingNodeHandling();
+ }
+
+ this.graph.setEdgeCachingThreshold(maxEdgesInMemory);
+ }
+
+ @Override
+ public void accumulate(Tuple t) throws IOException
+ {
+ if (aborted)
+ {
+ return;
+ }
+
+ DataBag bag = (DataBag) t.get(0);
+ if (bag == null || bag.size() == 0)
+ return;
+
+ for (Tuple sourceTuple : bag)
+ {
+ Integer sourceId = (Integer)sourceTuple.get(0);
+ DataBag edges = (DataBag)sourceTuple.get(1);
+
+ ArrayList<Map<String,Object>> edgesMapList = new ArrayList<Map<String, Object>>();
+
+ for (Tuple edgeTuple : edges)
+ {
+ Integer destId = (Integer)edgeTuple.get(0);
+ Double weight = (Double)edgeTuple.get(1);
+ HashMap<String,Object> edgeMap = new HashMap<String, Object>();
+ edgeMap.put("dest",destId);
+ edgeMap.put("weight",weight);
+ edgesMapList.add(edgeMap);
+ }
+
+ graph.addEdges(sourceId, edgesMapList);
+
+ if (graph.nodeCount() + graph.edgeCount() > maxNodesAndEdges)
+ {
+ System.out.println(String.format("There are too many nodes and edges (%d + %d > %d). Aborting.", graph.nodeCount(), graph.edgeCount(), maxNodesAndEdges));
+ aborted = true;
+ }
+
+ reporter.progress();
+ }
+ }
+
+ @Override
+ public DataBag getValue()
+ {
+ if (aborted)
+ {
+ return null;
+ }
+
+ System.out.println(String.format("Nodes: %d, Edges: %d", graph.nodeCount(), graph.edgeCount()));
+
+ ProgressIndicator progressIndicator = getProgressIndicator();
+ System.out.println("Finished loading graph.");
+ long startTime = System.nanoTime();
+ System.out.println("Initializing.");
+ try
+ {
+ graph.init(progressIndicator);
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ return null;
+ }
+ System.out.println(String.format("Done, took %f ms", (System.nanoTime() - startTime)/10.0e6));
+
+ float totalDiff;
+ int iter = 0;
+
+ System.out.println("Beginning iterations");
+ startTime = System.nanoTime();
+ do
+ {
+ // TODO log percentage complete every 5 minutes
+ try
+ {
+ totalDiff = graph.nextIteration(progressIndicator);
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ return null;
+ }
+ iter++;
+ } while(iter < maxIters && totalDiff > tolerance);
+ System.out.println(String.format("Done, %d iterations took %f ms", iter, (System.nanoTime() - startTime)/10.0e6));
+
+ DataBag output = bagFactory.newDefaultBag();
+
+ for (Int2IntMap.Entry node : graph.getNodeIds())
+ {
+ int nodeId = node.getIntKey();
+ float rank = graph.getNodeRank(nodeId);
+ List nodeData = new ArrayList(2);
+ nodeData.add(nodeId);
+ nodeData.add(rank);
+ output.add(tupleFactory.newTuple(nodeData));
+ }
+
+ return output;
+ }
+
+ @Override
+ public void cleanup()
+ {
+ try
+ {
+ aborted = false;
+ this.graph.clear();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public DataBag exec(Tuple input) throws IOException
+ {
+ try
+ {
+ accumulate(input);
+
+ return getValue();
+ }
+ finally
+ {
+ cleanup();
+ }
+ }
+
+ private ProgressIndicator getProgressIndicator()
+ {
+ return new ProgressIndicator()
+ {
+ @Override
+ public void progress()
+ {
+ reporter.progress();
+ }
+ };
+ }
+
+ @Override
+ public Schema outputSchema(Schema input)
+ {
+ try
+ {
+ Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+ if (inputFieldSchema.type != DataType.BAG)
+ {
+ throw new RuntimeException("Expected a BAG as input");
+ }
+
+ Schema inputBagSchema = inputFieldSchema.schema;
+
+ if (inputBagSchema.getField(0).type != DataType.TUPLE)
+ {
+ throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
+ DataType.findTypeName(inputBagSchema.getField(0).type)));
+ }
+
+ Schema inputTupleSchema = inputBagSchema.getField(0).schema;
+
+ if (inputTupleSchema.getField(0).type != DataType.INTEGER)
+ {
+ throw new RuntimeException(String.format("Expected source to be an INTEGER, but instead found %s",
+ DataType.findTypeName(inputTupleSchema.getField(0).type)));
+ }
+
+ if (inputTupleSchema.getField(1).type != DataType.BAG)
+ {
+ throw new RuntimeException(String.format("Expected edges to be represented with a BAG"));
+ }
+
+ Schema.FieldSchema edgesFieldSchema = inputTupleSchema.getField(1);
+
+ if (edgesFieldSchema.schema.getField(0).type != DataType.TUPLE)
+ {
+ throw new RuntimeException(String.format("Expected edges field to contain a TUPLE, but instead found %s",
+ DataType.findTypeName(edgesFieldSchema.schema.getField(0).type)));
+ }
+
+ Schema edgesTupleSchema = edgesFieldSchema.schema.getField(0).schema;
+
+ if (edgesTupleSchema.getField(0).type != DataType.INTEGER)
+ {
+ throw new RuntimeException(String.format("Expected destination edge ID to an INTEGER, but instead found %s",
+ DataType.findTypeName(edgesFieldSchema.schema.getField(0).type)));
+ }
+
+ if (edgesTupleSchema.getField(1).type != DataType.DOUBLE)
+ {
+ throw new RuntimeException(String.format("Expected destination edge weight to a DOUBLE, but instead found %s",
+ DataType.findTypeName(edgesFieldSchema.schema.getField(1).type)));
+ }
+
+ Schema tupleSchema = new Schema();
+ tupleSchema.add(new Schema.FieldSchema("node",DataType.INTEGER));
+ tupleSchema.add(new Schema.FieldSchema("rank",DataType.FLOAT));
+
+ return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
+ .getName()
+ .toLowerCase(), input),
+ tupleSchema,
+ DataType.BAG));
+ }
+ catch (FrontendException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/PigTests.java b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/PigTests.java
new file mode 100644
index 00000000..1d4102be
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/PigTests.java
@@ -0,0 +1,193 @@
+package org.apache.bigtop.itest.datafu;
+
+import static org.junit.Assert.*;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.pigunit.PigTest;
+import org.apache.pig.tools.parameters.ParseException;
+
+public abstract class PigTests
+{
+ protected String[] getDefaultArgs()
+ {
+ String[] args = {
+ "JAR_PATH=" + getJarPath()
+ };
+ return args;
+ }
+
+ protected List<String> getDefaultArgsAsList()
+ {
+ String[] args = getDefaultArgs();
+ List<String> argsList = new ArrayList<String>(args.length);
+ for (String arg : args)
+ {
+ argsList.add(arg);
+ }
+ return argsList;
+ }
+
+ protected PigTest createPigTest(String scriptPath, String... args) throws IOException
+ {
+ // append args to list of default args
+ List<String> theArgs = getDefaultArgsAsList();
+ for (String arg : args)
+ {
+ theArgs.add(arg);
+ }
+
+ String[] lines = getLinesFromFile(scriptPath);
+
+ for (String arg : theArgs)
+ {
+ String[] parts = arg.split("=",2);
+ if (parts.length == 2)
+ {
+ for (int i=0; i<lines.length; i++)
+ {
+ lines[i] = lines[i].replaceAll(Pattern.quote("$" + parts[0]), parts[1]);
+ }
+ }
+ }
+
+ return new PigTest(lines);
+ }
+
+ protected PigTest createPigTest(String scriptPath) throws IOException
+ {
+ return createPigTest(scriptPath, getDefaultArgs());
+ }
+
+ protected String getJarPath()
+ {
+ String jarDir = "dist";
+ if (System.getProperty("datafu.jar.dir") != null)
+ {
+ jarDir = System.getProperty("datafu.jar.dir");
+ }
+
+ String jarDirPath = new File(/* System.getProperty("user.dir"), */ jarDir).getAbsolutePath();
+
+ File userDir = new File(jarDirPath);
+
+ String[] files = userDir.list(new FilenameFilter() {
+
+ @Override
+ public boolean accept(File dir, String name)
+ {
+ return name.startsWith("datafu") && name.endsWith(".jar") && !name.contains("sources") && !name.contains("javadoc");
+ }
+
+ });
+
+ if (files.length == 0)
+ {
+ throw new RuntimeException("Could not find JAR file");
+ }
+ else if (files.length > 1)
+ {
+ throw new RuntimeException("Found more JAR files than expected");
+ }
+
+ return userDir.getAbsolutePath() + "/" + files[0];
+ }
+
+ protected List<Tuple> getLinesForAlias(PigTest test, String alias) throws IOException, ParseException
+ {
+ return getLinesForAlias(test,alias,true);
+ }
+
+ protected List<Tuple> getLinesForAlias(PigTest test, String alias, boolean logValues) throws IOException, ParseException
+ {
+ Iterator<Tuple> tuplesIterator = test.getAlias(alias);
+ List<Tuple> tuples = new ArrayList<Tuple>();
+ if (logValues)
+ {
+ System.out.println(String.format("Values for %s: ", alias));
+ }
+ while (tuplesIterator.hasNext())
+ {
+ Tuple tuple = tuplesIterator.next();
+ if (logValues)
+ {
+ System.out.println(tuple.toString());
+ }
+ tuples.add(tuple);
+ }
+ return tuples;
+ }
+
+ protected void writeLinesToFile(String fileName, String... lines) throws IOException
+ {
+ File inputFile = deleteIfExists(getFile(fileName));
+ writeLinesToFile(inputFile, lines);
+ }
+
+ protected void writeLinesToFile(File file, String[] lines) throws IOException
+ {
+ FileWriter writer = new FileWriter(file);
+ for (String line : lines)
+ {
+ writer.write(line + "\n");
+ }
+ writer.close();
+ }
+
+ protected void assertOutput(PigTest test, String alias, String... expected) throws IOException, ParseException
+ {
+ List<Tuple> tuples = getLinesForAlias(test, alias);
+ assertEquals(expected.length, tuples.size());
+ int i=0;
+ for (String e : expected)
+ {
+ assertEquals(tuples.get(i++).toString(), e);
+ }
+ }
+
+ protected File deleteIfExists(File file)
+ {
+ if (file.exists())
+ {
+ file.delete();
+ }
+ return file;
+ }
+
+ protected File getFile(String fileName)
+ {
+ return new File(System.getProperty("user.dir"), fileName).getAbsoluteFile();
+ }
+
+ /**
+ * Gets the lines from a given file.
+ *
+ * @param relativeFilePath The path relative to the datafu-tests project.
+ * @return The lines from the file
+ * @throws IOException
+ */
+ protected String[] getLinesFromFile(String relativeFilePath) throws IOException
+ {
+ // assume that the working directory is the datafu-tests project
+ File file = new File(System.getProperty("user.dir"), relativeFilePath).getAbsoluteFile();
+ BufferedInputStream content = new BufferedInputStream(new FileInputStream(file));
+ Object[] lines = IOUtils.readLines(content).toArray();
+ String[] result = new String[lines.length];
+ for (int i=0; i<lines.length; i++)
+ {
+ result[i] = (String)lines[i];
+ }
+ return result;
+ }
+}
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/bags/BagTests.java b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/bags/BagTests.java
new file mode 100644
index 00000000..361d065c
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/bags/BagTests.java
@@ -0,0 +1,290 @@
+package org.apache.bigtop.itest.datafu.bags;
+
+import org.apache.pig.pigunit.PigTest;
+import org.junit.Test;
+
+import org.apache.bigtop.itest.datafu.PigTests;
+
+
+public class BagTests extends PigTests
+{
+ @Test
+ public void nullToEmptyBagTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/bags/nullToEmptyBagTest.pig");
+
+ writeLinesToFile("input",
+ "({(1),(2),(3),(4),(5)})",
+ "()",
+ "{(4),(5)})");
+
+ test.runScript();
+
+ assertOutput(test, "data2",
+ "({(1),(2),(3),(4),(5)})",
+ "({})",
+ "({(4),(5)})");
+ }
+
+ @Test
+ public void appendToBagTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/bags/appendToBagTest.pig");
+
+ writeLinesToFile("input",
+ "1\t{(1),(2),(3)}\t(4)",
+ "2\t{(10),(20),(30),(40),(50)}\t(60)");
+
+ test.runScript();
+
+ assertOutput(test, "data2",
+ "(1,{(1),(2),(3),(4)})",
+ "(2,{(10),(20),(30),(40),(50),(60)})");
+ }
+
+ @Test
+ public void firstTupleFromBagTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/bags/firstTupleFromBagTest.pig");
+
+ writeLinesToFile("input", "1\t{(4),(9),(16)}");
+
+ test.runScript();
+
+ assertOutput(test, "data2", "(1,(4))");
+ }
+
+
+ @Test
+ public void prependToBagTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/bags/prependToBagTest.pig");
+
+ writeLinesToFile("input",
+ "1\t{(1),(2),(3)}\t(4)",
+ "2\t{(10),(20),(30),(40),(50)}\t(60)");
+
+ test.runScript();
+
+ assertOutput(test, "data2",
+ "(1,{(4),(1),(2),(3)})",
+ "(2,{(60),(10),(20),(30),(40),(50)})");
+ }
+
+ @Test
+ public void bagConcatTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/bags/bagConcatTest.pig");
+
+ writeLinesToFile("input",
+ "({(1),(2),(3)}\t{(3),(5),(6)}\t{(10),(13)})",
+ "({(2),(3),(4)}\t{(5),(5)}\t{(20)})");
+
+ test.runScript();
+
+ assertOutput(test, "data2",
+ "({(1),(2),(3),(3),(5),(6),(10),(13)})",
+ "({(2),(3),(4),(5),(5),(20)})");
+ }
+
+ @Test
+ public void unorderedPairsTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/bags/unorderedPairsTests.pig");
+
+ String[] input = {
+ "{(1),(2),(3),(4),(5)}"
+ };
+
+ String[] output = {
+ "(1,2)",
+ "(1,3)",
+ "(1,4)",
+ "(1,5)",
+ "(2,3)",
+ "(2,4)",
+ "(2,5)",
+ "(3,4)",
+ "(3,5)",
+ "(4,5)"
+ };
+
+ test.assertOutput("data",input,"data4",output);
+ }
+
+ @Test
+ public void unorderedPairsTest2() throws Exception
+ {
+ PigTest test = createPigTest("datafu/bags/unorderedPairsTests2.pig");
+
+ this.writeLinesToFile("input", "1\t{(1),(2),(3),(4),(5)}");
+
+ String[] output = {
+ "(1,2)",
+ "(1,3)",
+ "(1,4)",
+ "(1,5)",
+ "(2,3)",
+ "(2,4)",
+ "(2,5)",
+ "(3,4)",
+ "(3,5)",
+ "(4,5)"
+ };
+
+ test.runScript();
+ this.getLinesForAlias(test, "data3");
+
+ this.assertOutput(test, "data3",
+ "(1,(1),(2))",
+ "(1,(1),(3))",
+ "(1,(1),(4))",
+ "(1,(1),(5))",
+ "(1,(2),(3))",
+ "(1,(2),(4))",
+ "(1,(2),(5))",
+ "(1,(3),(4))",
+ "(1,(3),(5))",
+ "(1,(4),(5))");
+ }
+
+ @Test
+ public void bagSplitTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/bags/bagSplitTest.pig",
+ "MAX=5");
+
+ writeLinesToFile("input",
+ "{(1,11),(2,22),(3,33),(4,44),(5,55),(6,66),(7,77),(8,88),(9,99),(10,1010),(11,1111),(12,1212)}");
+
+ test.runScript();
+
+ assertOutput(test, "data3",
+ "({(1,11),(2,22),(3,33),(4,44),(5,55)})",
+ "({(6,66),(7,77),(8,88),(9,99),(10,1010)})",
+ "({(11,1111),(12,1212)})");
+ }
+
+ @Test
+ public void bagSplitWithBagNumTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/bags/bagSplitWithBagNumTest.pig",
+ "MAX=10");
+
+ writeLinesToFile("input",
+ "{(1,11),(2,22),(3,33),(4,44),(5,55),(6,66),(7,77),(8,88),(9,99),(10,1010),(11,1111),(12,1212)}");
+
+ test.runScript();
+
+ assertOutput(test, "data3",
+ "({(1,11),(2,22),(3,33),(4,44),(5,55),(6,66),(7,77),(8,88),(9,99),(10,1010)},0)",
+ "({(11,1111),(12,1212)},1)");
+ }
+
+ @Test
+ public void enumerateWithReverseTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/bags/enumerateWithReverseTest.pig");
+
+ writeLinesToFile("input",
+ "({(10,{(1),(2),(3)}),(20,{(4),(5),(6)}),(30,{(7),(8)}),(40,{(9),(10),(11)}),(50,{(12),(13),(14),(15)})})");
+
+ test.runScript();
+
+ assertOutput(test, "data4",
+ "(10,{(1),(2),(3)},5)",
+ "(20,{(4),(5),(6)},4)",
+ "(30,{(7),(8)},3)",
+ "(40,{(9),(10),(11)},2)",
+ "(50,{(12),(13),(14),(15)},1)");
+ }
+
+ @Test
+ public void enumerateWithStartTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/bags/enumerateWithStartTest.pig");
+
+ writeLinesToFile("input",
+ "({(10,{(1),(2),(3)}),(20,{(4),(5),(6)}),(30,{(7),(8)}),(40,{(9),(10),(11)}),(50,{(12),(13),(14),(15)})})");
+
+ test.runScript();
+
+ assertOutput(test, "data4",
+ "(10,{(1),(2),(3)},1)",
+ "(20,{(4),(5),(6)},2)",
+ "(30,{(7),(8)},3)",
+ "(40,{(9),(10),(11)},4)",
+ "(50,{(12),(13),(14),(15)},5)");
+ }
+
+ @Test
+ public void enumerateTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/bags/enumerateTest.pig");
+
+ writeLinesToFile("input",
+ "({(10,{(1),(2),(3)}),(20,{(4),(5),(6)}),(30,{(7),(8)}),(40,{(9),(10),(11)}),(50,{(12),(13),(14),(15)})})");
+
+ test.runScript();
+
+ assertOutput(test, "data4",
+ "(10,{(1),(2),(3)},0)",
+ "(20,{(4),(5),(6)},1)",
+ "(30,{(7),(8)},2)",
+ "(40,{(9),(10),(11)},3)",
+ "(50,{(12),(13),(14),(15)},4)");
+ }
+
+ @Test
+ public void comprehensiveBagSplitAndEnumerate() throws Exception
+ {
+ PigTest test = createPigTest("datafu/bags/comprehensiveBagSplitAndEnumerate.pig");
+
+ writeLinesToFile("input",
+ "({(A,1.0),(B,2.0),(C,3.0),(D,4.0),(E,5.0)})");
+
+ test.runScript();
+
+ assertOutput(test, "data_out",
+ // bag #1
+ "(A,1.0,1)",
+ "(B,2.0,1)",
+ "(C,3.0,1)",
+ // bag #2
+ "(D,4.0,2)",
+ "(E,5.0,2)");
+ }
+
+ @Test
+ public void aliasBagFieldsTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/bags/aliasBagFieldsTest.pig");
+
+ writeLinesToFile("input",
+ "({(A,1,0),(B,2,0),(C,3,0),(D,4,0),(E,5,0)})");
+
+ test.runScript();
+
+ assertOutput(test, "data4",
+ "(A,1)",
+ "(B,2)",
+ "(C,3)",
+ "(D,4)",
+ "(E,5)");
+ }
+
+ @Test
+ public void distinctByTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/bags/distinctByTest.pig");
+
+ writeLinesToFile("input",
+ "({(Z,1,0),(A,1,0),(A,1,0),(B,2,0),(B,22,1),(C,3,0),(D,4,0),(E,5,0)})");
+
+ test.runScript();
+
+ assertOutput(test, "data2",
+ "({(Z,1,0),(A,1,0),(B,2,0),(C,3,0),(D,4,0),(E,5,0)})");
+ }
+
+}
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/bags/sets/SetTests.java b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/bags/sets/SetTests.java
new file mode 100644
index 00000000..7a96f1b4
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/bags/sets/SetTests.java
@@ -0,0 +1,56 @@
+package org.apache.bigtop.itest.datafu.bags.sets;
+
+import org.apache.pig.pigunit.PigTest;
+import org.junit.Test;
+
+import org.apache.bigtop.itest.datafu.PigTests;
+
+public class SetTests extends PigTests
+{
+ @Test
+ public void setIntersectTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/bags/sets/setIntersectTest.pig");
+
+ String[] input = {
+ "{(1,10),(2,20),(3,30),(4,40),(5,50),(6,60)}\t{(0,0),(2,20),(4,40),(8,80)}",
+ "{(1,10),(1,10),(2,20),(3,30),(3,30),(4,40),(4,40)}\t{(1,10),(3,30)}"
+ };
+
+ String[] output = {
+ "({(2,20),(4,40)})",
+ "({(1,10),(3,30)})"
+ };
+
+ test.assertOutput("data",input,"data2",output);
+ }
+
+ @Test
+ public void setIntersectOutOfOrderTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/bags/sets/setIntersectTest.pig");
+
+ this.writeLinesToFile("input",
+ "{(1,10),(3,30),(2,20),(4,40),(5,50),(6,60)}\t{(0,0),(2,20),(4,40),(8,80)}");
+
+ test.runScript();
+
+ this.getLinesForAlias(test, "data2");
+ }
+
+ @Test
+ public void setUnionTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/bags/sets/setUnionTest.pig");
+
+ String[] input = {
+ "{(1,10),(1,20),(1,30),(1,40),(1,50),(1,60),(1,80)}\t{(1,1),(1,20),(1,25),(1,25),(1,25),(1,40),(1,70),(1,80)}"
+ };
+
+ String[] output = {
+ "({(1,10),(1,20),(1,30),(1,40),(1,50),(1,60),(1,80),(1,1),(1,25),(1,70)})"
+ };
+
+ test.assertOutput("data",input,"data2",output);
+ }
+}
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/date/TimeTests.java b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/date/TimeTests.java
new file mode 100644
index 00000000..940adb2d
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/date/TimeTests.java
@@ -0,0 +1,47 @@
+package org.apache.bigtop.itest.datafu.date;
+
+import org.apache.pig.pigunit.PigTest;
+import org.junit.Test;
+
+import org.apache.bigtop.itest.datafu.PigTests;
+
+public class TimeTests extends PigTests
+{
+ @Test
+ public void timeCountPageViewsTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/date/timeCountPageViewsTest.pig",
+ "TIME_WINDOW=30m",
+ "JAR_PATH=" + getJarPath());
+
+ String[] input = {
+ "1\t100\t2010-01-01T01:00:00Z",
+ "1\t100\t2010-01-01T01:15:00Z",
+ "1\t100\t2010-01-01T01:31:00Z",
+ "1\t100\t2010-01-01T01:35:00Z",
+ "1\t100\t2010-01-01T02:30:00Z",
+
+ "1\t101\t2010-01-01T01:00:00Z",
+ "1\t101\t2010-01-01T01:31:00Z",
+ "1\t101\t2010-01-01T02:10:00Z",
+ "1\t101\t2010-01-01T02:40:30Z",
+ "1\t101\t2010-01-01T03:30:00Z",
+
+ "1\t102\t2010-01-01T01:00:00Z",
+ "1\t102\t2010-01-01T01:01:00Z",
+ "1\t102\t2010-01-01T01:02:00Z",
+ "1\t102\t2010-01-01T01:10:00Z",
+ "1\t102\t2010-01-01T01:15:00Z",
+ "1\t102\t2010-01-01T01:25:00Z",
+ "1\t102\t2010-01-01T01:30:00Z"
+ };
+
+ String[] output = {
+ "(1,100,2)",
+ "(1,101,5)",
+ "(1,102,1)"
+ };
+
+ test.assertOutput("views",input,"view_counts",output);
+ }
+}
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/geo/GeoTests.java b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/geo/GeoTests.java
new file mode 100644
index 00000000..efd8c389
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/geo/GeoTests.java
@@ -0,0 +1,57 @@
+package org.apache.bigtop.itest.datafu.geo;
+
+import static org.junit.Assert.*;
+
+import java.util.List;
+
+import org.apache.pig.data.Tuple;
+import org.apache.pig.pigunit.PigTest;
+import org.junit.Test;
+
+import org.apache.bigtop.itest.datafu.PigTests;
+
+public class GeoTests extends PigTests
+{
+ @Test
+ public void haversineTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/geo/haversineTest.pig");
+
+ // Approximate latitude and longitude for major cities from maps.google.com
+ double[] la = {34.040143,-118.243103};
+ double[] tokyo = {35.637209,139.65271};
+ double[] ny = {40.716038,-73.99498};
+ double[] paris = {48.857713,2.342491};
+ double[] sydney = {-33.872696,151.195221};
+
+ this.writeLinesToFile("input",
+ coords(la,tokyo),
+ coords(ny,tokyo),
+ coords(ny,sydney),
+ coords(ny,paris));
+
+ test.runScript();
+
+ List<Tuple> distances = this.getLinesForAlias(test, "data2");
+
+ // ensure distance is within 20 miles of expected (distances found online)
+ assertWithin(5478.0, distances.get(0), 20.0); // la <-> tokyo
+ assertWithin(6760.0, distances.get(1), 20.0); // ny <-> tokyo
+ assertWithin(9935.0, distances.get(2), 20.0); // ny <-> sydney
+ assertWithin(3635.0, distances.get(3), 20.0); // ny <-> paris
+
+ }
+
+ private void assertWithin(double expected, Tuple actual, double maxDiff) throws Exception
+ {
+ Double actualVal = (Double)actual.get(0);
+ assertTrue(Math.abs(expected-actualVal) < maxDiff);
+ }
+
+ private String coords(double[] coords1, double[] coords2)
+ {
+ assertTrue(coords1.length == 2);
+ assertTrue(coords2.length == 2);
+ return String.format("%f\t%f\t%f\t%f", coords1[0], coords1[1], coords2[0], coords2[1]);
+ }
+}
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/hash/HashTests.java b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/hash/HashTests.java
new file mode 100644
index 00000000..6408c358
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/hash/HashTests.java
@@ -0,0 +1,45 @@
+package org.apache.bigtop.itest.datafu.hash;
+
+import org.apache.pig.pigunit.PigTest;
+import org.junit.Test;
+
+import org.apache.bigtop.itest.datafu.PigTests;
+
+public class HashTests extends PigTests
+{
+ @Test
+ public void md5Test() throws Exception
+ {
+ PigTest test = createPigTest("datafu/hash/md5Test.pig");
+
+ writeLinesToFile("input",
+ "ladsljkasdglk",
+ "lkadsljasgjskdjks",
+ "aladlasdgjks");
+
+ test.runScript();
+
+ assertOutput(test, "data_out",
+ "(d9a82575758bb4978949dc0659205cc6)",
+ "(9ec37f02fae0d8d6a7f4453a62272f1f)",
+ "(cb94139a8b9f3243e68a898ec6bd9b3d)");
+ }
+
+ @Test
+ public void md5Base64Test() throws Exception
+ {
+ PigTest test = createPigTest("datafu/hash/md5Base64Test.pig");
+
+ writeLinesToFile("input",
+ "ladsljkasdglk",
+ "lkadsljasgjskdjks",
+ "aladlasdgjks");
+
+ test.runScript();
+
+ assertOutput(test, "data_out",
+ "(2agldXWLtJeJSdwGWSBcxg==)",
+ "(nsN/Avrg2Nan9EU6YicvHw==)",
+ "(y5QTmoufMkPmiomOxr2bPQ==)");
+ }
+}
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/linkanalysis/PageRankTest.java b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/linkanalysis/PageRankTest.java
new file mode 100644
index 00000000..90db9b07
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/linkanalysis/PageRankTest.java
@@ -0,0 +1,281 @@
+package org.apache.bigtop.itest.datafu.linkanalysis;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+public class PageRankTest
+{
+ @Test
+ public void wikipediaGraphInMemoryTest() throws Exception {
+ System.out.println();
+ System.out.println("Starting wikipediaGraphInMemoryTest");
+
+ datafu.linkanalysis.PageRank graph = new datafu.linkanalysis.PageRank();
+
+ String[] edges = getWikiExampleEdges();
+
+ Map<String,Integer> nodeIdsMap = loadGraphFromEdgeList(graph, edges);
+
+ // Without dangling node handling we will not get the true page rank since the total rank will
+ // not add to 1.0. Without dangling node handling some of the page rank drains out of the graph.
+ graph.enableDanglingNodeHandling();
+
+ performIterations(graph, 150, 1e-18f);
+
+ String[] expectedRanks = getWikiExampleExpectedRanks();
+
+ Map<String,Float> expectedRanksMap = parseExpectedRanks(expectedRanks);
+
+ validateExpectedRanks(graph, nodeIdsMap, expectedRanksMap);
+ }
+
+ @Test
+ public void wikipediaGraphDiskCacheTest() throws Exception {
+ System.out.println();
+ System.out.println("Starting wikipediaGraphDiskCacheTest");
+
+ datafu.linkanalysis.PageRank graph = new datafu.linkanalysis.PageRank();
+
+ String[] edges = getWikiExampleEdges();
+
+ graph.enableEdgeDiskCaching();
+ graph.setEdgeCachingThreshold(5);
+
+ Map<String,Integer> nodeIdsMap = loadGraphFromEdgeList(graph, edges);
+
+ assert graph.isUsingEdgeDiskCache() : "Expected disk cache to be used";
+
+ // Without dangling node handling we will not get the true page rank since the total rank will
+ // not add to 1.0. Without dangling node handling some of the page rank drains out of the graph.
+ graph.enableDanglingNodeHandling();
+
+ performIterations(graph, 150, 1e-18f);
+
+ String[] expectedRanks = getWikiExampleExpectedRanks();
+
+ Map<String,Float> expectedRanksMap = parseExpectedRanks(expectedRanks);
+
+ validateExpectedRanks(graph, nodeIdsMap, expectedRanksMap);
+ }
+
+ @Test
+ public void hubAndSpokeInMemoryTest() throws Exception {
+ System.out.println();
+ System.out.println("Starting hubAndSpokeInMemoryTest");
+
+ datafu.linkanalysis.PageRank graph = new datafu.linkanalysis.PageRank();
+
+ String[] edges = getHubAndSpokeEdges();
+
+ Map<String,Integer> nodeIdsMap = loadGraphFromEdgeList(graph, edges);
+
+ graph.enableDanglingNodeHandling();
+
+ performIterations(graph, 150, 1e-18f);
+
+ // no need to validate, this is just a perf test for runtime comparison
+ }
+
+ @Test
+ public void hubAndSpokeDiskCacheTest() throws Exception {
+ System.out.println();
+ System.out.println("Starting hubAndSpokeDiskCacheTest");
+
+ datafu.linkanalysis.PageRank graph = new datafu.linkanalysis.PageRank();
+
+ String[] edges = getHubAndSpokeEdges();
+
+ graph.enableEdgeDiskCaching();
+ graph.setEdgeCachingThreshold(5);
+
+ Map<String,Integer> nodeIdsMap = loadGraphFromEdgeList(graph, edges);
+
+ graph.enableDanglingNodeHandling();
+
+ performIterations(graph, 150, 1e-18f);
+
+ // no need to validate, this is just a perf test for runtime comparison
+ }
+
+ private String[] getHubAndSpokeEdges()
+ {
+ int count = 50000;
+ String[] edges = new String[count];
+
+ for (int i=0; i<count; i++)
+ {
+ edges[i] = String.format("S%d H", i);
+ }
+ return edges;
+ }
+
+ public static String[] getWikiExampleEdges()
+ {
+ // graph taken from:
+ // http://en.wikipedia.org/wiki/PageRank
+ String[] edges = {
+ "B C",
+ "C B",
+ "D A",
+ "D B",
+ "E D",
+ "E B",
+ "E F",
+ "F E",
+ "F B",
+ "P1 B",
+ "P1 E",
+ "P2 B",
+ "P2 E",
+ "P3 B",
+ "P3 E",
+ "P4 E",
+ "P5 E"
+ };
+ return edges;
+ }
+
+ public static String[] getWikiExampleExpectedRanks()
+ {
+ // these ranks come from the Wikipedia page:
+ // http://en.wikipedia.org/wiki/PageRank
+ String[] expectedRanks = {
+ "A 3.3",
+ "B 38.4",
+ "C 34.3",
+ "D 3.9",
+ "E 8.1",
+ "F 3.9",
+ "P1 1.6",
+ "P2 1.6",
+ "P3 1.6",
+ "P4 1.6",
+ "P5 1.6"
+ };
+ return expectedRanks;
+ }
+
+ private Map<String,Integer> loadGraphFromEdgeList(datafu.linkanalysis.PageRank graph, String[] edges) throws IOException
+ {
+ Map<Integer,ArrayList<Map<String,Object>>> nodeEdgesMap = new HashMap<Integer,ArrayList<Map<String,Object>>>();
+ Map<String,Integer> nodeIdsMap = new HashMap<String,Integer>();
+
+ for (String edge : edges)
+ {
+ String[] parts = edge.split(" ");
+ assert parts.length == 2 : "Expected two parts";
+
+ int sourceId = getOrCreateId(parts[0], nodeIdsMap);
+ int destId = getOrCreateId(parts[1], nodeIdsMap);
+
+ Map<String,Object> edgeMap = new HashMap<String,Object>();
+ edgeMap.put("weight", 1.0);
+ edgeMap.put("dest", destId);
+
+ ArrayList<Map<String,Object>> nodeEdges = null;
+
+ if (nodeEdgesMap.containsKey(sourceId))
+ {
+ nodeEdges = nodeEdgesMap.get(sourceId);
+ }
+ else
+ {
+ nodeEdges = new ArrayList<Map<String,Object>>();
+ nodeEdgesMap.put(sourceId, nodeEdges);
+ }
+
+ nodeEdges.add(edgeMap);
+ }
+
+ for (Map.Entry<Integer, ArrayList<Map<String,Object>>> e : nodeEdgesMap.entrySet())
+ {
+ graph.addEdges(e.getKey(), e.getValue());
+ }
+
+ return nodeIdsMap;
+ }
+
+ private void performIterations(datafu.linkanalysis.PageRank graph, int maxIters, float tolerance) throws IOException
+ {
+ System.out.println(String.format("Beginning iteration (maxIters = %d, tolerance=%e)", maxIters, tolerance));
+
+ datafu.linkanalysis.PageRank.ProgressIndicator progressIndicator = getDummyProgressIndicator();
+
+ System.out.println("Initializing graph");
+ long startTime = System.nanoTime();
+ graph.init(progressIndicator);
+ System.out.println(String.format("Done, took %f ms", (System.nanoTime() - startTime)/10.0e6));
+
+ float totalDiff;
+ int iter = 0;
+
+ System.out.println("Beginning iterations");
+ startTime = System.nanoTime();
+ do
+ {
+ totalDiff = graph.nextIteration(progressIndicator);
+ iter++;
+ } while(iter < maxIters && totalDiff > tolerance);
+ System.out.println(String.format("Done, took %f ms", (System.nanoTime() - startTime)/10.0e6));
+ }
+
+ private datafu.linkanalysis.PageRank.ProgressIndicator getDummyProgressIndicator()
+ {
+ return new datafu.linkanalysis.PageRank.ProgressIndicator()
+ {
+ @Override
+ public void progress()
+ {
+ // do nothing
+ }
+ };
+ }
+
+ private void validateExpectedRanks(datafu.linkanalysis.PageRank graph, Map<String,Integer> nodeIds, Map<String,Float> expectedRanks)
+ {
+ System.out.println("Validating page rank results");
+
+ for (Map.Entry<String,Integer> e : nodeIds.entrySet())
+ {
+ float rank = graph.getNodeRank(e.getValue());
+
+ float expectedRank = expectedRanks.get(e.getKey());
+ // require 0.1% accuracy
+ assert (Math.abs(expectedRank - rank*100.0f) < 0.1) : String.format("Did not get expected rank for %s", e.getKey());
+ }
+
+ System.out.println("All ranks match expected");
+ }
+
+ public static Map<String,Float> parseExpectedRanks(String[] expectedRanks)
+ {
+ Map<String,Float> expectedRanksMap = new HashMap<String,Float>();
+ for (String expectedRankString : expectedRanks)
+ {
+ String[] parts = expectedRankString.split(" ");
+ assert parts.length == 2 : "Expected two parts";
+ String name = parts[0];
+ Float expectedRank = Float.parseFloat(parts[1]);
+ expectedRanksMap.put(name, expectedRank);
+ }
+ return expectedRanksMap;
+ }
+
+ private Integer getOrCreateId(String name, Map<String,Integer> nodeIds)
+ {
+ if (nodeIds.containsKey(name))
+ {
+ return nodeIds.get(name);
+ }
+ else
+ {
+ Integer id = nodeIds.size();
+ nodeIds.put(name, id);
+ return id;
+ }
+ }
+}
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/linkanalysis/PageRankTests.java b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/linkanalysis/PageRankTests.java
new file mode 100644
index 00000000..8e0a0150
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/linkanalysis/PageRankTests.java
@@ -0,0 +1,102 @@
+package org.apache.bigtop.itest.datafu.linkanalysis;
+
+
+import static org.junit.Assert.*;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.pig.data.Tuple;
+import org.apache.pig.pigunit.PigTest;
+import org.junit.Test;
+
+
+import org.apache.bigtop.itest.datafu.linkanalysis.PageRankTest;
+import org.apache.bigtop.itest.datafu.PigTests;
+
+public class PageRankTests extends PigTests
+{
+ @Test
+ public void pigPageRankTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/linkanalysis/pageRankTest.pig");
+
+ String[] edges = PageRankTest.getWikiExampleEdges();
+
+ Map<String,Integer> nodeIds = new HashMap<String,Integer>();
+ Map<Integer,String> nodeIdsReversed = new HashMap<Integer,String>();
+ Map<String,Float> expectedRanks = PageRankTest.parseExpectedRanks(PageRankTest.getWikiExampleExpectedRanks());
+
+ File f = new File(System.getProperty("user.dir"), "input").getAbsoluteFile();
+ if (f.exists())
+ {
+ f.delete();
+ }
+
+ FileWriter writer = new FileWriter(f);
+ BufferedWriter bufferedWriter = new BufferedWriter(writer);
+
+ for (String edge : edges)
+ {
+ String[] edgeParts = edge.split(" ");
+ String source = edgeParts[0];
+ String dest = edgeParts[1];
+ if (!nodeIds.containsKey(source))
+ {
+ int id = nodeIds.size();
+ nodeIds.put(source,id);
+ nodeIdsReversed.put(id, source);
+ }
+ if (!nodeIds.containsKey(dest))
+ {
+ int id = nodeIds.size();
+ nodeIds.put(dest,id);
+ nodeIdsReversed.put(id, dest);
+ }
+ Integer sourceId = nodeIds.get(source);
+ Integer destId = nodeIds.get(dest);
+
+ StringBuffer sb = new StringBuffer();
+
+ sb.append("1\t"); // topic
+ sb.append(sourceId.toString() + "\t");
+ sb.append(destId.toString() + "\t");
+ sb.append("1.0\n"); // weight
+
+ bufferedWriter.write(sb.toString());
+ }
+
+ bufferedWriter.close();
+
+ test.runScript();
+ Iterator<Tuple> tuples = test.getAlias("data_grouped3");
+
+ System.out.println("Final node ranks:");
+ int nodeCount = 0;
+ while (tuples.hasNext())
+ {
+ Tuple nodeTuple = tuples.next();
+
+ Integer topic = (Integer)nodeTuple.get(0);
+ Integer nodeId = (Integer)nodeTuple.get(1);
+ Float nodeRank = (Float)nodeTuple.get(2);
+
+ assertEquals(1, topic.intValue());
+
+ System.out.println(String.format("%d => %f", nodeId, nodeRank));
+
+ Float expectedNodeRank = expectedRanks.get(nodeIdsReversed.get(nodeId));
+
+ assertTrue(String.format("expected: %f, actual: %f", expectedNodeRank, nodeRank),
+ Math.abs(expectedNodeRank - nodeRank * 100.0f) < 0.1);
+
+ nodeCount++;
+ }
+
+ assertEquals(nodeIds.size(),nodeCount);
+ }
+}
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/numbers/NumberTests.java b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/numbers/NumberTests.java
new file mode 100644
index 00000000..001b30de
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/numbers/NumberTests.java
@@ -0,0 +1,47 @@
+package org.apache.bigtop.itest.datafu.numbers;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.data.Tuple;
+import org.apache.pig.pigunit.PigTest;
+import org.junit.Test;
+
+import org.apache.bigtop.itest.datafu.PigTests;
+
+public class NumberTests extends PigTests
+{
+ /**
+ * Test the RandomIntRange UDF. The main purpose is to make sure it can be used in a Pig script.
+ * Also the range of output values is tested.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void randomIntRangeTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/numbers/randomIntRangeTest.pig",
+ "MIN=1", "MAX=10");
+
+ List<String> input = new ArrayList<String>();
+ for (int i=0; i<100; i++)
+ {
+ input.add(String.format("(%d)", i));
+ }
+
+ writeLinesToFile("input",
+ input.toArray(new String[0]));
+
+ test.runScript();
+
+ List<Tuple> tuples = getLinesForAlias(test, "data2", false);
+ for (Tuple tuple : tuples)
+ {
+ Integer randValue = (Integer)tuple.get(1);
+ assertTrue(randValue >= 1);
+ assertTrue(randValue <= 10);
+ }
+ }
+}
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/sessions/SessionTests.java b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/sessions/SessionTests.java
new file mode 100644
index 00000000..78057671
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/sessions/SessionTests.java
@@ -0,0 +1,74 @@
+package org.apache.bigtop.itest.datafu.sessions;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+
+import org.apache.pig.data.Tuple;
+import org.apache.pig.pigunit.PigTest;
+import org.junit.Test;
+
+import org.apache.bigtop.itest.datafu.PigTests;
+
+public class SessionTests extends PigTests
+{
+ @Test
+ public void sessionizeTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/sessions/sessionizeTest.pig",
+ "TIME_WINDOW=30m",
+ "JAR_PATH=" + getJarPath());
+
+ this.writeLinesToFile("input",
+ "2010-01-01T01:00:00Z\t1\t10",
+ "2010-01-01T01:15:00Z\t1\t20",
+ "2010-01-01T01:31:00Z\t1\t10",
+ "2010-01-01T01:35:00Z\t1\t20",
+ "2010-01-01T02:30:00Z\t1\t30",
+
+ "2010-01-01T01:00:00Z\t2\t10",
+ "2010-01-01T01:31:00Z\t2\t20",
+ "2010-01-01T02:10:00Z\t2\t30",
+ "2010-01-01T02:40:30Z\t2\t40",
+ "2010-01-01T03:30:00Z\t2\t50",
+
+ "2010-01-01T01:00:00Z\t3\t10",
+ "2010-01-01T01:01:00Z\t3\t20",
+ "2010-01-01T01:02:00Z\t3\t5",
+ "2010-01-01T01:10:00Z\t3\t25",
+ "2010-01-01T01:15:00Z\t3\t50",
+ "2010-01-01T01:25:00Z\t3\t30",
+ "2010-01-01T01:30:00Z\t3\t15");
+
+ test.runScript();
+
+ HashMap<Integer,HashMap<Integer,Boolean>> userValues = new HashMap<Integer,HashMap<Integer,Boolean>>();
+
+ for (Tuple t : this.getLinesForAlias(test, "max_value"))
+ {
+ Integer userId = (Integer)t.get(0);
+ Integer max = (Integer)t.get(1);
+ if (!userValues.containsKey(userId))
+ {
+ userValues.put(userId, new HashMap<Integer,Boolean>());
+ }
+ userValues.get(userId).put(max, true);
+ }
+
+ assertEquals(userValues.get(1).size(), 2);
+ assertEquals(userValues.get(2).size(), 5);
+ assertEquals(userValues.get(3).size(), 1);
+
+ assertTrue(userValues.get(1).containsKey(20));
+ assertTrue(userValues.get(1).containsKey(30));
+
+ assertTrue(userValues.get(2).containsKey(10));
+ assertTrue(userValues.get(2).containsKey(20));
+ assertTrue(userValues.get(2).containsKey(30));
+ assertTrue(userValues.get(2).containsKey(40));
+ assertTrue(userValues.get(2).containsKey(50));
+
+ assertTrue(userValues.get(3).containsKey(50));
+ }
+}
+
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/stats/MarkovPairTests.java b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/stats/MarkovPairTests.java
new file mode 100644
index 00000000..87e34f41
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/stats/MarkovPairTests.java
@@ -0,0 +1,87 @@
+package org.apache.bigtop.itest.datafu.stats;
+
+import static org.junit.Assert.*;
+
+import java.util.Iterator;
+
+import org.apache.pig.data.Tuple;
+import org.apache.pig.pigunit.PigTest;
+import org.junit.Test;
+
+import org.apache.bigtop.itest.datafu.PigTests;
+
+public class MarkovPairTests extends PigTests
+{
+ @Test
+ public void markovPairDefaultTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/stats/markovPairDefault.pig",
+ "schema=(data: bag {t: tuple(val:int)})");
+
+ writeLinesToFile("input", "{(10),(20),(30),(40),(50),(60)}");
+
+ String[] expectedOutput = {
+ "({((10),(20)),((20),(30)),((30),(40)),((40),(50)),((50),(60))})"
+ };
+
+ test.runScript();
+
+ Iterator<Tuple> actualOutput = test.getAlias("data_out");
+
+ assertTuplesMatch(expectedOutput, actualOutput);
+ }
+
+ @Test
+ public void markovPairMultipleInput() throws Exception
+ {
+ PigTest test = createPigTest("datafu/stats/markovPairDefault.pig",
+ "schema=(data: bag {t: tuple(val1:int,val2:int)})");
+
+ writeLinesToFile("input", "{(10,100),(20,200),(30,300),(40,400),(50,500),(60,600)}");
+
+ String[] expectedOutput = {
+ "({((10,100),(20,200)),((20,200),(30,300)),((30,300),(40,400)),((40,400),(50,500)),((50,500),(60,600))})"
+ };
+
+
+ test.runScript();
+
+ Iterator<Tuple> actualOutput = test.getAlias("data_out");
+
+ assertTuplesMatch(expectedOutput, actualOutput);
+ }
+
+ @Test
+ public void markovPairLookaheadTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/stats/markovPairLookahead.pig",
+ "schema=(data: bag {t: tuple(val:int)})",
+ "lookahead=3");
+
+ writeLinesToFile("input", "{(10),(20),(30),(40),(50)}");
+
+ String[] expectedOutput = {
+ "({((10),(20)),((10),(30)),((10),(40)),((20),(30)),((20),(40)),((20),(50)),((30),(40)),((30),(50)),((40),(50))})"
+ };
+
+ test.runScript();
+
+ Iterator<Tuple> actualOutput = test.getAlias("data_out");
+
+ assertTuplesMatch(expectedOutput, actualOutput);
+ }
+
+ private void assertTuplesMatch(String[] expectedOutput, Iterator<Tuple> actualOutput)
+ {
+ Iterator<Tuple> tuples = actualOutput;
+
+ for (String outputLine : expectedOutput)
+ {
+ assertTrue(tuples.hasNext());
+ Tuple outputTuple = tuples.next();
+ System.out.println(String.format("expected: %s", outputLine));
+ System.out.println(String.format("actual: %s", outputTuple.toString()));
+ assertEquals(outputLine,outputTuple.toString());
+ }
+ }
+}
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/stats/QuantileTests.java b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/stats/QuantileTests.java
new file mode 100644
index 00000000..a2e5a54c
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/stats/QuantileTests.java
@@ -0,0 +1,178 @@
+package org.apache.bigtop.itest.datafu.stats;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.data.Tuple;
+import org.apache.pig.pigunit.PigTest;
+import org.junit.Test;
+
+import org.apache.bigtop.itest.datafu.PigTests;
+
+public class QuantileTests extends PigTests
+{
+ @Test
+ public void quantileTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/stats/quantileTest.pig",
+ "QUANTILES='0.0','0.25','0.5','0.75','1.0'");
+
+ String[] input = {"1","2","3","4","10","5","6","7","8","9"};
+ writeLinesToFile("input", input);
+
+ test.runScript();
+
+ List<Tuple> output = getLinesForAlias(test, "data_out", true);
+
+ assertEquals(output.size(),1);
+ assertEquals(output.get(0).toString(), "(1.0,3.0,5.5,8.0,10.0)");
+ }
+
+ @Test
+ public void quantile2Test() throws Exception
+ {
+ PigTest test = createPigTest("datafu/stats/quantileTest.pig",
+ "QUANTILES='5'");
+
+ String[] input = {"1","2","3","4","10","5","6","7","8","9"};
+ writeLinesToFile("input", input);
+
+ test.runScript();
+
+ List<Tuple> output = getLinesForAlias(test, "data_out", true);
+
+ assertEquals(output.size(),1);
+ assertEquals(output.get(0).toString(), "(1.0,3.0,5.5,8.0,10.0)");
+ }
+
+ @Test
+ public void medianTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/stats/medianTest.pig");
+
+ String[] input = {"4","5","6","9","10","7","8","2","3","1"};
+ writeLinesToFile("input", input);
+
+ test.runScript();
+
+ List<Tuple> output = getLinesForAlias(test, "data_out", true);
+
+ assertEquals(output.size(),1);
+ assertEquals(output.get(0).toString(), "(5.5)");
+ }
+
+ @Test
+ public void streamingMedianTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/stats/streamingMedianTest.pig");
+
+ String[] input = {"0","4","5","6","9","10","7","8","2","3","1"};
+ writeLinesToFile("input", input);
+
+ test.runScript();
+
+ List<Tuple> output = getLinesForAlias(test, "data_out", true);
+
+ assertEquals(output.size(),1);
+ assertEquals(output.get(0).toString(), "(5.0)");
+ }
+
+ @Test
+ public void streamingQuantileTest() throws Exception {
+ PigTest test = createPigTest("datafu/stats/streamingQuantileTest.pig",
+ "QUANTILES='5'");
+
+ String[] input = {"1","2","3","4","10","5","6","7","8","9"};
+ writeLinesToFile("input", input);
+
+ test.runScript();
+
+ List<Tuple> output = getLinesForAlias(test, "data_out", true);
+
+ assertEquals(output.size(),1);
+ assertEquals(output.get(0).toString(), "(1.0,3.0,5.0,8.0,10.0)");
+ }
+
+ @Test
+ public void streamingQuantile2Test() throws Exception {
+ PigTest test = createPigTest("datafu/stats/streamingQuantileTest.pig",
+ "QUANTILES='0.5','0.75','1.0'");
+
+ String[] input = {"1","2","3","4","10","5","6","7","8","9"};
+ writeLinesToFile("input", input);
+
+ test.runScript();
+
+ List<Tuple> output = getLinesForAlias(test, "data_out", true);
+
+ assertEquals(output.size(),1);
+ assertEquals(output.get(0).toString(), "(5.0,8.0,10.0)");
+ }
+
+ @Test
+ public void streamingQuantile3Test() throws Exception {
+ PigTest test = createPigTest("datafu/stats/streamingQuantileTest.pig",
+ "QUANTILES='0.07','0.03','0.37','1.0','0.0'");
+
+ List<String> input = new ArrayList<String>();
+ for (int i=1000; i>=1; i--)
+ {
+ input.add(Integer.toString(i));
+ }
+
+ writeLinesToFile("input", input.toArray(new String[0]));
+
+ test.runScript();
+
+ List<Tuple> output = getLinesForAlias(test, "data_out", true);
+
+ assertEquals(output.size(),1);
+ assertEquals(output.get(0).toString(), "(70.0,30.0,370.0,1000.0,1.0)");
+ }
+
+ @Test
+ public void streamingQuantile4Test() throws Exception {
+ PigTest test = createPigTest("datafu/stats/streamingQuantileTest.pig",
+ "QUANTILES='0.0013','0.0228','0.1587','0.5','0.8413','0.9772','0.9987'");
+
+ List<String> input = new ArrayList<String>();
+ for (int i=100000; i>=0; i--)
+ {
+ input.add(Integer.toString(i));
+ }
+
+ writeLinesToFile("input", input.toArray(new String[0]));
+
+ test.runScript();
+
+ List<Tuple> output = getLinesForAlias(test, "data_out", true);
+
+ assertEquals(output.size(),1);
+ assertEquals(output.get(0).toString(), "(130.0,2280.0,15870.0,50000.0,84130.0,97720.0,99870.0)");
+ }
+
+
+
+ @Test
+ public void quantile3Test() throws Exception {
+ PigTest test = createPigTest("datafu/stats/quantileTest.pig",
+ "QUANTILES='0.0013','0.0228','0.1587','0.5','0.8413','0.9772','0.9987'");
+
+ List<String> input = new ArrayList<String>();
+ for (int i=100000; i>=0; i--)
+ {
+ input.add(Integer.toString(i));
+ }
+
+ writeLinesToFile("input", input.toArray(new String[0]));
+
+ test.runScript();
+
+ List<Tuple> output = getLinesForAlias(test, "data_out", true);
+
+ assertEquals(output.size(),1);
+ assertEquals(output.get(0).toString(), "(130.0,2280.0,15870.0,50000.0,84130.0,97720.0,99870.0)");
+ }
+}
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/stats/WilsonBinConfTests.java b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/stats/WilsonBinConfTests.java
new file mode 100644
index 00000000..32121fb7
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/stats/WilsonBinConfTests.java
@@ -0,0 +1,63 @@
+package org.apache.bigtop.itest.datafu.stats;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.data.Tuple;
+import org.apache.pig.pigunit.PigTest;
+import org.junit.Test;
+
+import org.apache.bigtop.itest.datafu.PigTests;
+
+public class WilsonBinConfTests extends PigTests
+{
+ @Test
+ public void wilsonTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/stats/wilsonBinConfTests.pig",
+ "alpha=0.05"); // alpha is 0.05 for 95% confidence
+
+ writeLinesToFile("input",
+ "1\t1",
+ "1\t2",
+ "50\t100",
+ "500\t1000",
+ "999\t1000",
+ "1000\t1000",
+ "998\t1000");
+
+ test.runScript();
+
+ /* Add expected values, computed using R:
+ *
+ * e.g.
+ *
+ * library(Hmisc)
+ *
+ * binconf(50,100)
+ * binconf(500,1000)
+ *
+ */
+ List<String> expectedOutput = new ArrayList<String>();
+ expectedOutput.add("0.05129,1.00000");
+ expectedOutput.add("0.02565,0.97435");
+ expectedOutput.add("0.40383,0.59617");
+ expectedOutput.add("0.46907,0.53093");
+ expectedOutput.add("0.99436,0.99995");
+ expectedOutput.add("0.99617,1.00000");
+ expectedOutput.add("0.99274,0.99945");
+
+ List<Tuple> output = this.getLinesForAlias(test, "data_out");
+ Iterator<String> expectationIterator = expectedOutput.iterator();
+ for (Tuple t : output)
+ {
+ assertTrue(expectationIterator.hasNext());
+ Double lower = (Double)t.get(0);
+ Double upper = (Double)t.get(1);
+ assertEquals(String.format("%.5f,%.5f",lower,upper),expectationIterator.next());
+ }
+ }
+}
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/urls/UserAgentTest.java b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/urls/UserAgentTest.java
new file mode 100644
index 00000000..33a919a6
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/urls/UserAgentTest.java
@@ -0,0 +1,39 @@
+package org.apache.bigtop.itest.datafu.urls;
+
+import org.apache.pig.pigunit.PigTest;
+import org.junit.Test;
+
+import org.apache.bigtop.itest.datafu.PigTests;
+
+public class UserAgentTest extends PigTests
+{
+
+ @Test
+ public void userAgentTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/urls/userAgentTest.pig");
+
+ String[] input = {
+ "Mozilla/5.0 (iPhone; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5",
+ "Mozilla/5.0 (compatible; Konqueror/3.5; Linux; X11; de) KHTML/3.5.2 (like Gecko) Kubuntu 6.06 Dapper",
+ "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:2.2a1pre) Gecko/20110331 Firefox/4.2a1pre Fennec/4.1a1pre",
+ "Opera/9.00 (X11; Linux i686; U; en)",
+ "Wget/1.10.2",
+ "Opera/9.80 (Android; Linux; Opera Mobi/ADR-1012221546; U; pl) Presto/2.7.60 Version/10.5",
+ "Mozilla/5.0 (Linux; U; Android 2.2; en-us; DROID2 Build/VZW) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1"
+ };
+
+ String[] output = {
+ "(mobile)",
+ "(desktop)",
+ "(mobile)",
+ "(desktop)",
+ "(desktop)",
+ "(mobile)",
+ "(mobile)",
+ };
+
+ test.assertOutput("data",input,"data_out",output);
+ }
+
+}
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/util/AssertTests.java b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/util/AssertTests.java
new file mode 100644
index 00000000..e4f5c1b2
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/util/AssertTests.java
@@ -0,0 +1,75 @@
+package org.apache.bigtop.itest.datafu.util;
+
+import static org.junit.Assert.*;
+
+import org.apache.pig.pigunit.PigTest;
+import org.junit.Test;
+
+import org.apache.bigtop.itest.datafu.PigTests;
+
+public class AssertTests extends PigTests
+{
+ @Test
+ public void shouldAssertWithMessageOnZero() throws Exception
+ {
+ try
+ {
+ PigTest test = createPigTest("datafu/util/assertWithMessageTest.pig");
+
+ this.writeLinesToFile("input", "0");
+
+ test.runScript();
+
+ this.getLinesForAlias(test, "data2");
+
+ fail("test should have failed, but it didn't");
+ }
+ catch (Exception e)
+ {
+ }
+ }
+
+ @Test
+ public void shouldNotAssertWithMessageOnOne() throws Exception
+ {
+ PigTest test = createPigTest("datafu/util/assertWithMessageTest.pig");
+
+ this.writeLinesToFile("input", "1");
+
+ test.runScript();
+
+ this.getLinesForAlias(test, "data2");
+ }
+
+ @Test
+ public void shouldAssertWithoutMessageOnZero() throws Exception
+ {
+ try
+ {
+ PigTest test = createPigTest("datafu/util/assertWithoutMessageTest.pig");
+
+ this.writeLinesToFile("input", "0");
+
+ test.runScript();
+
+ this.getLinesForAlias(test, "data2");
+
+ fail("test should have failed, but it didn't");
+ }
+ catch (Exception e)
+ {
+ }
+ }
+
+ @Test
+ public void shouldNotAssertWithoutMessageOnOne() throws Exception
+ {
+ PigTest test = createPigTest("datafu/util/assertWithoutMessageTest.pig");
+
+ this.writeLinesToFile("input", "1");
+
+ test.runScript();
+
+ this.getLinesForAlias(test, "data2");
+ }
+}
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/util/IntBoolConversionPigTests.java b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/util/IntBoolConversionPigTests.java
new file mode 100644
index 00000000..505783aa
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/groovy/org/apache/bigtop/itest/datafu/util/IntBoolConversionPigTests.java
@@ -0,0 +1,59 @@
+package org.apache.bigtop.itest.datafu.util;
+
+import org.apache.pig.pigunit.PigTest;
+import org.junit.Test;
+
+import org.apache.bigtop.itest.datafu.PigTests;
+
+public class IntBoolConversionPigTests extends PigTests
+{
+ @Test
+ public void intToBoolTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/util/intToBoolTest.pig");
+
+ String[] input = {
+ "", // null
+ "0",
+ "1"
+ };
+
+ String[] output = {
+ "(false)",
+ "(false)",
+ "(true)"
+ };
+
+ test.assertOutput("data",input,"data2",output);
+ }
+
+ @Test
+ public void intToBoolToIntTest() throws Exception
+ {
+ PigTest test = createPigTest("datafu/util/intToBoolToIntTest.pig");
+
+ String[] input = {
+ "", // null
+ "0",
+ "1",
+ "2",
+ "-1",
+ "-2",
+ "0",
+ ""
+ };
+
+ String[] output = {
+ "(0)",
+ "(0)",
+ "(1)",
+ "(1)",
+ "(1)",
+ "(1)",
+ "(0)",
+ "(0)"
+ };
+
+ test.assertOutput("data",input,"data3",output);
+ }
+}
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/aliasBagFieldsTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/aliasBagFieldsTest.pig
new file mode 100644
index 00000000..247c832a
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/aliasBagFieldsTest.pig
@@ -0,0 +1,20 @@
+register $JAR_PATH
+
+define AliasBagFields datafu.pig.bags.AliasBagFields('[a#alpha,b#numeric]');
+
+data = LOAD 'input' AS (data: bag {T: tuple(a:CHARARRAY, b:INT, c:INT)});
+
+data2 = FOREACH data GENERATE AliasBagFields(data) as data;
+
+describe data2;
+
+data3 = FOREACH data2 GENERATE FLATTEN(data);
+
+describe data3;
+
+data4 = FOREACH data3 GENERATE data::alpha, data::numeric;
+
+describe data4;
+
+STORE data4 INTO 'output';
+
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/appendToBagTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/appendToBagTest.pig
new file mode 100644
index 00000000..d906bc46
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/appendToBagTest.pig
@@ -0,0 +1,9 @@
+register $JAR_PATH
+
+define AppendToBag datafu.pig.bags.AppendToBag();
+
+data = LOAD 'input' AS (key:INT, B: bag{T: tuple(v:INT)}, T: tuple(v:INT));
+
+data2 = FOREACH data GENERATE key, AppendToBag(B,T) as B;
+
+STORE data2 INTO 'output';
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/bagConcatTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/bagConcatTest.pig
new file mode 100644
index 00000000..30d46a0d
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/bagConcatTest.pig
@@ -0,0 +1,11 @@
+register $JAR_PATH
+
+define BagConcat datafu.pig.bags.BagConcat();
+
+data = LOAD 'input' AS (A: bag{T: tuple(v:INT)}, B: bag{T: tuple(v:INT)}, C: bag{T: tuple(v:INT)});
+
+data2 = FOREACH data GENERATE BagConcat(A,B,C);
+
+describe data2
+
+STORE data2 INTO 'output'; \ No newline at end of file
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/bagSplitTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/bagSplitTest.pig
new file mode 100644
index 00000000..ee4f5381
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/bagSplitTest.pig
@@ -0,0 +1,14 @@
+register $JAR_PATH
+
+define BagSplit datafu.pig.bags.BagSplit();
+
+data = LOAD 'input' AS (B:bag{T:tuple(val1:INT,val2:INT)});
+
+data2 = FOREACH data GENERATE BagSplit($MAX,B);
+describe data2;
+
+data3 = FOREACH data2 GENERATE FLATTEN($0);
+
+describe data3
+
+STORE data3 INTO 'output';
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/bagSplitWithBagNumTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/bagSplitWithBagNumTest.pig
new file mode 100644
index 00000000..833e912d
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/bagSplitWithBagNumTest.pig
@@ -0,0 +1,11 @@
+register $JAR_PATH
+
+define BagSplit datafu.pig.bags.BagSplit('true');
+
+data = LOAD 'input' AS (B:bag{T:tuple(val1:INT,val2:INT)});
+
+data2 = FOREACH data GENERATE BagSplit($MAX,B);
+
+data3 = FOREACH data2 GENERATE FLATTEN($0);
+
+STORE data3 INTO 'output'; \ No newline at end of file
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/comprehensiveBagSplitAndEnumerate.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/comprehensiveBagSplitAndEnumerate.pig
new file mode 100644
index 00000000..88d73928
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/comprehensiveBagSplitAndEnumerate.pig
@@ -0,0 +1,26 @@
+register $JAR_PATH
+
+define BagSplit datafu.pig.bags.BagSplit();
+define Enumerate datafu.pig.bags.Enumerate('1');
+
+data = LOAD 'input' AS (data: bag {T: tuple(name:CHARARRAY, score:double)});
+
+data2 = FOREACH data GENERATE BagSplit(3,data) as the_bags;
+
+describe data2
+
+data3 = FOREACH data2 GENERATE Enumerate(the_bags) as enumerated_bags;
+
+describe data3
+
+data4 = FOREACH data3 GENERATE FLATTEN(enumerated_bags) as (data,i);
+
+describe data4
+
+data5 = FOREACH data4 GENERATE data as the_data, i as the_key;
+
+describe data5
+
+data_out = FOREACH data5 GENERATE FLATTEN(the_data), the_key;
+
+describe data_out \ No newline at end of file
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/distinctByTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/distinctByTest.pig
new file mode 100644
index 00000000..9532d07c
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/distinctByTest.pig
@@ -0,0 +1,12 @@
+register $JAR_PATH
+
+define DistinctBy datafu.pig.bags.DistinctBy('0');
+
+data = LOAD 'input' AS (data: bag {T: tuple(a:CHARARRAY, b:INT, c:INT)});
+
+data2 = FOREACH data GENERATE DistinctBy(data);
+
+describe data2;
+
+STORE data2 INTO 'output';
+
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/enumerateTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/enumerateTest.pig
new file mode 100644
index 00000000..1647485c
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/enumerateTest.pig
@@ -0,0 +1,16 @@
+register $JAR_PATH
+
+define Enumerate datafu.pig.bags.Enumerate();
+
+data = LOAD 'input' AS (data: bag {T: tuple(v1:INT,B: bag{T: tuple(v2:INT)})});
+
+data2 = FOREACH data GENERATE Enumerate(data);
+describe data2;
+
+data3 = FOREACH data2 GENERATE FLATTEN($0);
+describe data3;
+
+data4 = FOREACH data3 GENERATE $0 as v1, $1 as B, $2 as i;
+describe data4;
+
+STORE data4 INTO 'output';
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/enumerateWithReverseTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/enumerateWithReverseTest.pig
new file mode 100644
index 00000000..1f04b041
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/enumerateWithReverseTest.pig
@@ -0,0 +1,16 @@
+register $JAR_PATH
+
+define Enumerate datafu.pig.bags.Enumerate('1', 'true');
+
+data = LOAD 'input' AS (data: bag {T: tuple(v1:INT,B: bag{T: tuple(v2:INT)})});
+
+data2 = FOREACH data GENERATE Enumerate(data);
+describe data2;
+
+data3 = FOREACH data2 GENERATE FLATTEN($0);
+describe data3;
+
+data4 = FOREACH data3 GENERATE $0 as v1, $1 as B, $2 as i;
+describe data4;
+
+STORE data4 INTO 'output';
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/enumerateWithStartTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/enumerateWithStartTest.pig
new file mode 100644
index 00000000..d288a6e8
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/enumerateWithStartTest.pig
@@ -0,0 +1,16 @@
+register $JAR_PATH
+
+define Enumerate datafu.pig.bags.Enumerate('1');
+
+data = LOAD 'input' AS (data: bag {T: tuple(v1:INT,B: bag{T: tuple(v2:INT)})});
+
+data2 = FOREACH data GENERATE Enumerate(data);
+describe data2;
+
+data3 = FOREACH data2 GENERATE FLATTEN($0);
+describe data3;
+
+data4 = FOREACH data3 GENERATE $0 as v1, $1 as B, $2 as i;
+describe data4;
+
+STORE data4 INTO 'output';
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/firstTupleFromBagTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/firstTupleFromBagTest.pig
new file mode 100644
index 00000000..921787ec
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/firstTupleFromBagTest.pig
@@ -0,0 +1,9 @@
+register $JAR_PATH
+
+define FirstTupleFromBag datafu.pig.bags.FirstTupleFromBag();
+
+data = LOAD 'input' AS (key:INT, B: bag{T: tuple(v:INT)});
+
+data2 = FOREACH data GENERATE key, FirstTupleFromBag(B, null) as B;
+
+STORE data2 INTO 'output';
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/nullToEmptyBagTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/nullToEmptyBagTest.pig
new file mode 100644
index 00000000..3e809b36
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/nullToEmptyBagTest.pig
@@ -0,0 +1,14 @@
+register $JAR_PATH
+
+define NullToEmptyBag datafu.pig.bags.NullToEmptyBag();
+
+data = LOAD 'input' AS (B: bag {T: tuple(v:INT)});
+
+dump data;
+
+data2 = FOREACH data GENERATE NullToEmptyBag(B) as P;
+
+dump data2;
+
+STORE data2 INTO 'output';
+
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/prependToBagTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/prependToBagTest.pig
new file mode 100644
index 00000000..c8523464
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/prependToBagTest.pig
@@ -0,0 +1,9 @@
+register $JAR_PATH
+
+define PrependToBag datafu.pig.bags.PrependToBag();
+
+data = LOAD 'input' AS (key:INT, B: bag{T: tuple(v:INT)}, T: tuple(v:INT));
+
+data2 = FOREACH data GENERATE key, PrependToBag(B,T) as B;
+
+STORE data2 INTO 'output';
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/sets/setIntersectTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/sets/setIntersectTest.pig
new file mode 100644
index 00000000..6f590e8c
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/sets/setIntersectTest.pig
@@ -0,0 +1,9 @@
+register $JAR_PATH
+
+define SetIntersect datafu.pig.bags.sets.SetIntersect();
+
+data = LOAD 'input' AS (B1:bag{T:tuple(val1:int,val2:int)},B2:bag{T:tuple(val1:int,val2:int)});
+
+data2 = FOREACH data GENERATE SetIntersect(B1,B2);
+
+STORE data2 INTO 'output';
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/sets/setUnionTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/sets/setUnionTest.pig
new file mode 100644
index 00000000..a5e1c4d6
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/sets/setUnionTest.pig
@@ -0,0 +1,13 @@
+register $JAR_PATH
+
+define SetUnion datafu.pig.bags.sets.SetUnion();
+
+data = LOAD 'input' AS (B1:bag{T:tuple(val1:int,val2:int)},B2:bag{T:tuple(val1:int,val2:int)});
+
+dump data
+
+data2 = FOREACH data GENERATE SetUnion(B1,B2);
+
+dump data2
+
+STORE data2 INTO 'output';
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/unorderedPairsTests.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/unorderedPairsTests.pig
new file mode 100644
index 00000000..1bf68bd2
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/unorderedPairsTests.pig
@@ -0,0 +1,16 @@
+register $JAR_PATH
+
+define UnorderedPairs datafu.pig.bags.UnorderedPairs();
+
+data = LOAD 'input' AS (B: bag {T: tuple(v:INT)});
+
+data2 = FOREACH data GENERATE UnorderedPairs(B) as P;
+
+data3 = FOREACH data2 GENERATE FLATTEN(P);
+
+data4 = FOREACH data3 GENERATE FLATTEN(elem1), FLATTEN(elem2);
+
+data5 = ORDER data4 BY $0, $1;
+
+STORE data5 INTO 'output';
+
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/unorderedPairsTests2.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/unorderedPairsTests2.pig
new file mode 100644
index 00000000..aada011c
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/bags/unorderedPairsTests2.pig
@@ -0,0 +1,12 @@
+register $JAR_PATH
+
+define UnorderedPairs datafu.pig.bags.UnorderedPairs();
+
+data = LOAD 'input' AS (A:int, B: bag {T: tuple(v:INT)});
+
+data2 = FOREACH data GENERATE A, UnorderedPairs(B) as P;
+
+data3 = FOREACH data2 GENERATE A, FLATTEN(P);
+
+STORE data3 INTO 'output';
+
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/date/timeCountPageViewsTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/date/timeCountPageViewsTest.pig
new file mode 100644
index 00000000..1e23a411
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/date/timeCountPageViewsTest.pig
@@ -0,0 +1,13 @@
+register $JAR_PATH
+
+define TimeCount datafu.pig.date.TimeCount('$TIME_WINDOW');
+
+views = LOAD 'input' AS (user_id:int, page_id:int, time:chararray);
+
+views_grouped = GROUP views BY (user_id, page_id);
+view_counts = foreach views_grouped {
+ views = order views by time;
+ generate group.user_id as user_id, group.page_id as page_id, TimeCount(views.(time)) as count;
+}
+
+STORE view_counts INTO 'output';
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/geo/haversineTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/geo/haversineTest.pig
new file mode 100644
index 00000000..e52cc1f2
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/geo/haversineTest.pig
@@ -0,0 +1,9 @@
+register $JAR_PATH
+
+define HaversineDistInMiles datafu.pig.geo.HaversineDistInMiles();
+
+data = LOAD 'input' AS (lat1:double,lng1:double,lat2:double,lng2:double);
+
+data2 = FOREACH data GENERATE HaversineDistInMiles(lat1,lng1,lat2,lng2);
+
+STORE data2 INTO 'output';
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/hash/md5Base64Test.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/hash/md5Base64Test.pig
new file mode 100644
index 00000000..5a12c2e7
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/hash/md5Base64Test.pig
@@ -0,0 +1,9 @@
+register $JAR_PATH
+
+define MD5 datafu.pig.hash.MD5Base64();
+
+data_in = LOAD 'input' as (val:chararray);
+
+data_out = FOREACH data_in GENERATE MD5(val) as val;
+
+STORE data_out INTO 'output'; \ No newline at end of file
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/hash/md5Test.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/hash/md5Test.pig
new file mode 100644
index 00000000..3fc6aaa0
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/hash/md5Test.pig
@@ -0,0 +1,9 @@
+register $JAR_PATH
+
+define MD5 datafu.pig.hash.MD5();
+
+data_in = LOAD 'input' as (val:chararray);
+
+data_out = FOREACH data_in GENERATE MD5(val) as val;
+
+STORE data_out INTO 'output'; \ No newline at end of file
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/linkanalysis/pageRankTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/linkanalysis/pageRankTest.pig
new file mode 100644
index 00000000..a0e439c2
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/linkanalysis/pageRankTest.pig
@@ -0,0 +1,25 @@
+register $JAR_PATH
+
+/* Need to enable dangling node handling since the Wikipedia example has them,
+ otherwise the ranks won't be right. */
+define PageRank datafu.pig.linkanalysis.PageRank('dangling_nodes','true');
+
+data = LOAD 'input' AS (topic:INT,source:INT,dest:INT,weight:DOUBLE);
+
+data_grouped = GROUP data by (topic,source);
+
+data_grouped = foreach data_grouped {
+ generate group.topic as topic, group.source as source, data.(dest,weight) as edges;
+};
+
+data_grouped2 = GROUP data_grouped by topic;
+data_grouped2 = foreach data_grouped2 {
+ generate group as topic, FLATTEN(PageRank(data_grouped.(source,edges))) as (source,rank);
+};
+
+data_grouped3 = FOREACH data_grouped2 GENERATE
+ topic,
+ source,
+ rank;
+
+STORE data_grouped3 INTO 'output';
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/numbers/randomIntRangeTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/numbers/randomIntRangeTest.pig
new file mode 100644
index 00000000..3ca45c73
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/numbers/randomIntRangeTest.pig
@@ -0,0 +1,8 @@
+register $JAR_PATH
+
+define RandInt datafu.pig.numbers.RandInt();
+
+data = LOAD 'input' AS (key:INT);
+data2 = FOREACH data GENERATE key, RandInt($MIN,$MAX) as val;
+
+STORE data2 INTO 'output';
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/sessions/sessionizeTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/sessions/sessionizeTest.pig
new file mode 100644
index 00000000..6a4939ee
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/sessions/sessionizeTest.pig
@@ -0,0 +1,17 @@
+register $JAR_PATH
+
+define Sessionize datafu.pig.sessions.Sessionize('$TIME_WINDOW');
+
+views = LOAD 'input' AS (time:chararray, user_id:int, value:int);
+
+views_grouped = GROUP views BY user_id;
+view_counts = FOREACH views_grouped {
+ views = ORDER views BY time;
+ GENERATE flatten(Sessionize(views)) as (time,user_id,value,session_id);
+}
+
+max_value = GROUP view_counts BY (user_id, session_id);
+
+max_value = FOREACH max_value GENERATE group.user_id, MAX(view_counts.value) AS val;
+
+STORE max_value INTO 'output';
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/markovPairDefault.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/markovPairDefault.pig
new file mode 100644
index 00000000..a121cb12
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/markovPairDefault.pig
@@ -0,0 +1,14 @@
+register $JAR_PATH
+
+define markovPairs datafu.pig.stats.MarkovPairs();
+
+data = load 'input' as $schema;
+describe data;
+
+data_out1 = foreach data generate data as orig_bag;
+describe data_out1;
+
+data_out = foreach data_out1 generate markovPairs(orig_bag) as markov_bag;
+describe data_out;
+
+store data_out into 'output'; \ No newline at end of file
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/markovPairLookahead.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/markovPairLookahead.pig
new file mode 100644
index 00000000..269a1bc7
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/markovPairLookahead.pig
@@ -0,0 +1,14 @@
+register $JAR_PATH
+
+define markovPairs datafu.pig.stats.MarkovPairs('$lookahead');
+
+data = load 'input' as $schema;
+describe data;
+
+data_out1 = foreach data generate data as orig_bag;
+describe data_out1;
+
+data_out = foreach data_out1 generate markovPairs(orig_bag) as markov_bag;
+describe data_out;
+
+store data_out into 'output'; \ No newline at end of file
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/medianTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/medianTest.pig
new file mode 100644
index 00000000..0a439cee
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/medianTest.pig
@@ -0,0 +1,21 @@
+register $JAR_PATH
+
+define Median datafu.pig.stats.Median();
+
+data_in = LOAD 'input' as (val:int);
+
+/*describe data_in;*/
+
+data_out = GROUP data_in ALL;
+
+/*describe data_out;*/
+
+data_out = FOREACH data_out {
+ sorted = ORDER data_in BY val;
+ GENERATE Median(sorted) as medians;
+}
+data_out = FOREACH data_out GENERATE FLATTEN(medians);
+
+/*describe data_out;*/
+
+STORE data_out into 'output'; \ No newline at end of file
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/quantileTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/quantileTest.pig
new file mode 100644
index 00000000..604d179c
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/quantileTest.pig
@@ -0,0 +1,21 @@
+register $JAR_PATH
+
+define Quantile datafu.pig.stats.Quantile($QUANTILES);
+
+data_in = LOAD 'input' as (val:int);
+
+/*describe data_in;*/
+
+data_out = GROUP data_in ALL;
+
+/*describe data_out;*/
+
+data_out = FOREACH data_out {
+ sorted = ORDER data_in BY val;
+ GENERATE Quantile(sorted) as quantiles;
+}
+data_out = FOREACH data_out GENERATE FLATTEN(quantiles);
+
+/*describe data_out;*/
+
+STORE data_out into 'output'; \ No newline at end of file
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/streamingMedianTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/streamingMedianTest.pig
new file mode 100644
index 00000000..27d64f38
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/streamingMedianTest.pig
@@ -0,0 +1,21 @@
+register $JAR_PATH
+
+define Median datafu.pig.stats.StreamingMedian();
+
+data_in = LOAD 'input' as (val:int);
+
+/*describe data_in;*/
+
+data_out = GROUP data_in ALL;
+
+/*describe data_out;*/
+
+data_out = FOREACH data_out {
+ sorted = ORDER data_in BY val;
+ GENERATE Median(sorted) as medians;
+}
+data_out = FOREACH data_out GENERATE FLATTEN(medians);
+
+/*describe data_out;*/
+
+STORE data_out into 'output'; \ No newline at end of file
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/streamingQuantileTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/streamingQuantileTest.pig
new file mode 100644
index 00000000..51c3bc5f
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/streamingQuantileTest.pig
@@ -0,0 +1,18 @@
+register $JAR_PATH
+
+define Quantile datafu.pig.stats.StreamingQuantile($QUANTILES);
+
+data_in = LOAD 'input' as (val:int);
+
+/*describe data_in;*/
+
+data_out = GROUP data_in ALL;
+
+/*describe data_out;*/
+
+data_out = FOREACH data_out GENERATE Quantile(data_in.val) as quantiles;
+data_out = FOREACH data_out GENERATE FLATTEN(quantiles);
+
+/*describe data_out;*/
+
+STORE data_out into 'output';
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/wilsonBinConfTests.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/wilsonBinConfTests.pig
new file mode 100644
index 00000000..19fa466f
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/stats/wilsonBinConfTests.pig
@@ -0,0 +1,11 @@
+register $JAR_PATH
+
+define WilsonBinConf datafu.pig.stats.WilsonBinConf('$alpha');
+
+data = load 'input' as (successes:long, totals:long);
+describe data;
+
+data_out = FOREACH data GENERATE WilsonBinConf(successes, totals) as interval;
+data_out = FOREACH data_out GENERATE FLATTEN(interval);
+
+store data_out into 'output'; \ No newline at end of file
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/urls/userAgentTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/urls/userAgentTest.pig
new file mode 100644
index 00000000..45487551
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/urls/userAgentTest.pig
@@ -0,0 +1,8 @@
+register $JAR_PATH
+
+define UserAgentClassify datafu.pig.urls.UserAgentClassify();
+
+data = load 'input' as (usr_agent:chararray);
+data_out = foreach data generate UserAgentClassify(usr_agent) as class;
+describe data_out;
+store data_out into 'output'; \ No newline at end of file
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/util/assertWithMessageTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/util/assertWithMessageTest.pig
new file mode 100644
index 00000000..f240987d
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/util/assertWithMessageTest.pig
@@ -0,0 +1,10 @@
+register $JAR_PATH
+
+define ASSERT datafu.pig.util.ASSERT();
+
+data = LOAD 'input' AS (val:INT);
+
+data2 = FILTER data BY ASSERT(val,'assertion appears to have failed, doh!');
+
+STORE data2 INTO 'output';
+
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/util/assertWithoutMessageTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/util/assertWithoutMessageTest.pig
new file mode 100644
index 00000000..c6368e7b
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/util/assertWithoutMessageTest.pig
@@ -0,0 +1,10 @@
+register $JAR_PATH
+
+define ASSERT datafu.pig.util.ASSERT();
+
+data = LOAD 'input' AS (val:INT);
+
+data2 = FILTER data BY ASSERT(val);
+
+STORE data2 INTO 'output';
+
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/util/intToBoolTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/util/intToBoolTest.pig
new file mode 100644
index 00000000..18cda425
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/util/intToBoolTest.pig
@@ -0,0 +1,10 @@
+register $JAR_PATH
+
+define IntToBool datafu.pig.util.IntToBool();
+
+data = LOAD 'input' AS (val:INT);
+
+data2 = FOREACH data GENERATE IntToBool(val);
+
+STORE data2 INTO 'output';
+
diff --git a/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/util/intToBoolToIntTest.pig b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/util/intToBoolToIntTest.pig
new file mode 100644
index 00000000..82d3ee08
--- /dev/null
+++ b/bigtop-tests/test-artifacts/datafu/src/main/resources/datafu/util/intToBoolToIntTest.pig
@@ -0,0 +1,12 @@
+register $JAR_PATH
+
+define IntToBool datafu.pig.util.IntToBool();
+define BoolToInt datafu.pig.util.BoolToInt();
+
+data = LOAD 'input' AS (val:INT);
+
+data2 = FOREACH data GENERATE IntToBool(val) as val;
+data3 = FOREACH data2 GENERATE BoolToInt(val) as val;
+
+STORE data3 INTO 'output';
+
diff --git a/bigtop-tests/test-artifacts/pom.xml b/bigtop-tests/test-artifacts/pom.xml
index e3777b76..ed05baeb 100644
--- a/bigtop-tests/test-artifacts/pom.xml
+++ b/bigtop-tests/test-artifacts/pom.xml
@@ -45,6 +45,7 @@
<module>hue</module>
<module>solr</module>
<module>crunch</module>
+ <module>datafu</module>
<module>fatjar</module>
</modules>
diff --git a/bigtop-tests/test-execution/smokes/datafu/pom.xml b/bigtop-tests/test-execution/smokes/datafu/pom.xml
new file mode 100644
index 00000000..50cb38d4
--- /dev/null
+++ b/bigtop-tests/test-execution/smokes/datafu/pom.xml
@@ -0,0 +1,123 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.bigtop.itest</groupId>
+ <artifactId>smoke-tests</artifactId>
+ <version>0.6.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.bigtop.itest</groupId>
+ <artifactId>datafu-smoke-execution</artifactId>
+ <version>0.6.0-SNAPSHOT</version>
+ <name>datafu smoke test execution</name>
+
+ <properties>
+ <org.apache.maven-dependency-plugin.groupId>org.apache.bigtop.itest</org.apache.maven-dependency-plugin.groupId>
+ <org.apache.maven-dependency-plugin.artifactId>datafu-smoke</org.apache.maven-dependency-plugin.artifactId>
+ <org.apache.maven-dependency-plugin.version>${project.version}</org.apache.maven-dependency-plugin.version>
+ <org.apache.maven-dependency-plugin.output>${project.build.directory}</org.apache.maven-dependency-plugin.output>
+ <org.apache.maven-dependency-plugin.type>jar</org.apache.maven-dependency-plugin.type>
+ <org.apache.maven-failsafe-plugin.testInclude>**/*Tests*</org.apache.maven-failsafe-plugin.testInclude>
+
+ <HADOOP_MAPRED_HOME>${env.HADOOP_MAPRED_HOME}</HADOOP_MAPRED_HOME>
+ <HADOOP_CONF_DIR>${env.HADOOP_CONF_DIR}</HADOOP_CONF_DIR>
+ <PIG_HOME>${env.PIG_HOME}</PIG_HOME>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>${org.apache.maven-dependency-plugin.groupId}</groupId>
+ <artifactId>${org.apache.maven-dependency-plugin.artifactId}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-install-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>initialize</phase>
+ <goals>
+ <goal>install-file</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <file>${PIG_HOME}/pig.jar</file>
+ <groupId>org.apache.pig</groupId>
+ <artifactId>pig</artifactId>
+ <version>${pig.version}</version>
+ <packaging>jar</packaging>
+ </configuration>
+ </plugin>
+
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <version>1.0</version>
+ <executions>
+ <execution>
+ <id>enforce-property</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <rules>
+ <requireProperty>
+ <property>HADOOP_MAPRED_HOME</property>
+ <message>HADOOP_MAPRED_HOME env. variable has to be set</message>
+ </requireProperty>
+ <requireProperty>
+ <property>HADOOP_CONF_DIR</property>
+ <message>HADOOP_CONF_DIR env. variable has to be set</message>
+ </requireProperty>
+ <requireProperty>
+ <property>PIG_HOME</property>
+ <message>PIG_HOME env. variable has to be set</message>
+ </requireProperty>
+ </rules>
+ <fail>true</fail>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>2.11</version>
+ <configuration>
+ <forkMode>always</forkMode>
+ <argLine>-Dpigunit.exectype.cluster=true</argLine>
+ <additionalClasspathElements>
+ <additionalClasspathElement>${HADOOP_CONF_DIR}</additionalClasspathElement>
+ </additionalClasspathElements>
+ <systemPropertyVariables>
+ <datafu.jar.dir>${PIG_HOME}</datafu.jar.dir>
+ </systemPropertyVariables>
+ </configuration>
+
+ <!-- configuration>
+ <testSourceDirectory>/root/stacks/smokes/datafu/target/com/cloudera/itest/datafu/</testSourceDirectory>
+ <testClassesDirectory>/root/.m2/repository/com/cloudera/itest/datafu/4.1-cdh4u1-SNAPSHOT/</testClassesDirectory>
+ <skipTests>false</skipTests>
+ <testFailureIgnore>false</testFailureIgnore>
+ <argLine>-Dsun.lang.ClassLoader.allowArraySyntax=true -Djava.endorsed.dirs=${project.build.testOutputDirectory}/endorsed</argLine>
+ </configuration -->
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/pom.xml b/pom.xml
index a8017adb..0f3a6324 100644
--- a/pom.xml
+++ b/pom.xml
@@ -179,13 +179,11 @@
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
<version>${pig.version}</version>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pigsmoke</artifactId>
<version>${pig-smoke.version}</version>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
@@ -206,7 +204,6 @@
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop</artifactId>
<version>${sqoop.version}</version>
- <scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>