Browse Source

提交文件

Signed-off-by: Binren Zhang <zhangbr@elab-plus.com>
Binren Zhang 5 years ago
commit
e27fe63f77
100 changed files with 2706 additions and 0 deletions
  1. 2 0
      elab_udf.iml
  2. 56 0
      examples/com/aliyun/odps/examples/TestUtil.java
  3. 259 0
      examples/com/aliyun/odps/examples/graph/Kmeans.java
  4. 107 0
      examples/com/aliyun/odps/examples/graph/PageRank.java
  5. 130 0
      examples/com/aliyun/odps/examples/graph/SSSP.java
  6. 84 0
      examples/com/aliyun/odps/examples/mr/Resource.java
  7. 124 0
      examples/com/aliyun/odps/examples/mr/WordCount.java
  8. 106 0
      examples/com/aliyun/odps/examples/mr/test/WordCountTest.java
  9. 47 0
      examples/com/aliyun/odps/examples/udf/UDAFExample.java
  10. 93 0
      examples/com/aliyun/odps/examples/udf/UDAFResource.java
  11. 35 0
      examples/com/aliyun/odps/examples/udf/UDFExample.java
  12. 60 0
      examples/com/aliyun/odps/examples/udf/UDFResource.java
  13. 24 0
      examples/com/aliyun/odps/examples/udf/UDTFExample.java
  14. 68 0
      examples/com/aliyun/odps/examples/udf/UDTFResource.java
  15. 63 0
      examples/com/aliyun/odps/examples/udf/test/UDAFTest.java
  16. 69 0
      examples/com/aliyun/odps/examples/udf/test/UDFTest.java
  17. 68 0
      examples/com/aliyun/odps/examples/udf/test/UDTFTest.java
  18. 90 0
      examples/com/aliyun/odps/examples/udj/PayUserLogMergeJoin.java
  19. 240 0
      examples/com/aliyun/odps/examples/unstructured/SpeechSentenceSnrExtractor.java
  20. 18 0
      examples/com/aliyun/odps/examples/unstructured/SpeechStorageHandler.java
  21. 140 0
      examples/com/aliyun/odps/examples/unstructured/TextExtractor.java
  22. 56 0
      examples/com/aliyun/odps/examples/unstructured/TextOutputer.java
  23. 18 0
      examples/com/aliyun/odps/examples/unstructured/TextStorageHandler.java
  24. 111 0
      examples/com/aliyun/odps/examples/unstructured/test/ExtractorTest.java
  25. 133 0
      examples/com/aliyun/odps/examples/unstructured/test/OutputerTest.java
  26. 6 0
      examples/data/ambulance_csv/1.csv
  27. 9 0
      examples/data/ambulance_csv/2.csv
  28. BIN
      examples/data/speech_wav/tsh148_seg_2_3013_3_6_48_80bd359827e24dd7_0.wav
  29. BIN
      examples/data/speech_wav/tsh148_seg_3013_1_31_11_9d7c87aef9f3e559_0.wav
  30. BIN
      examples/data/speech_wav/tsh148_seg_3013_2_29_49_f4cb0990a6b4060c_0.wav
  31. 59 0
      pom.xml
  32. 287 0
      src/main/java/com/alibaba/dataworks/udtf/AdviserScoreUDTF.java
  33. 44 0
      src/test/java/Test.java
  34. BIN
      target/classes/com/alibaba/dataworks/udtf/AdviserScoreUDTF$Adviser.class
  35. BIN
      target/classes/com/alibaba/dataworks/udtf/AdviserScoreUDTF.class
  36. BIN
      target/test-classes/Test.class
  37. BIN
      target/test-classes/com/aliyun/odps/examples/TestUtil.class
  38. BIN
      target/test-classes/com/aliyun/odps/examples/graph/Kmeans$KmeansAggrValue.class
  39. BIN
      target/test-classes/com/aliyun/odps/examples/graph/Kmeans$KmeansAggregator.class
  40. BIN
      target/test-classes/com/aliyun/odps/examples/graph/Kmeans$KmeansVertex.class
  41. BIN
      target/test-classes/com/aliyun/odps/examples/graph/Kmeans$KmeansVertexReader.class
  42. BIN
      target/test-classes/com/aliyun/odps/examples/graph/Kmeans.class
  43. BIN
      target/test-classes/com/aliyun/odps/examples/graph/PageRank$PageRankVertex.class
  44. BIN
      target/test-classes/com/aliyun/odps/examples/graph/PageRank$PageRankVertexReader.class
  45. BIN
      target/test-classes/com/aliyun/odps/examples/graph/PageRank.class
  46. BIN
      target/test-classes/com/aliyun/odps/examples/graph/SSSP$MinLongCombiner.class
  47. BIN
      target/test-classes/com/aliyun/odps/examples/graph/SSSP$SSSPVertex.class
  48. BIN
      target/test-classes/com/aliyun/odps/examples/graph/SSSP$SSSPVertexReader.class
  49. BIN
      target/test-classes/com/aliyun/odps/examples/graph/SSSP.class
  50. BIN
      target/test-classes/com/aliyun/odps/examples/mr/Resource$TokenizerMapper.class
  51. BIN
      target/test-classes/com/aliyun/odps/examples/mr/Resource.class
  52. BIN
      target/test-classes/com/aliyun/odps/examples/mr/WordCount$SumCombiner.class
  53. BIN
      target/test-classes/com/aliyun/odps/examples/mr/WordCount$SumReducer.class
  54. BIN
      target/test-classes/com/aliyun/odps/examples/mr/WordCount$TokenizerMapper.class
  55. BIN
      target/test-classes/com/aliyun/odps/examples/mr/WordCount.class
  56. BIN
      target/test-classes/com/aliyun/odps/examples/mr/test/WordCountTest.class
  57. BIN
      target/test-classes/com/aliyun/odps/examples/udf/UDAFExample.class
  58. BIN
      target/test-classes/com/aliyun/odps/examples/udf/UDAFResource.class
  59. BIN
      target/test-classes/com/aliyun/odps/examples/udf/UDFExample.class
  60. BIN
      target/test-classes/com/aliyun/odps/examples/udf/UDFResource.class
  61. BIN
      target/test-classes/com/aliyun/odps/examples/udf/UDTFExample.class
  62. BIN
      target/test-classes/com/aliyun/odps/examples/udf/UDTFResource.class
  63. BIN
      target/test-classes/com/aliyun/odps/examples/udf/test/UDAFTest.class
  64. BIN
      target/test-classes/com/aliyun/odps/examples/udf/test/UDFTest.class
  65. BIN
      target/test-classes/com/aliyun/odps/examples/udf/test/UDTFTest.class
  66. BIN
      target/test-classes/com/aliyun/odps/examples/udj/PayUserLogMergeJoin.class
  67. BIN
      target/test-classes/com/aliyun/odps/examples/unstructured/SpeechSentenceSnrExtractor.class
  68. BIN
      target/test-classes/com/aliyun/odps/examples/unstructured/SpeechStorageHandler.class
  69. BIN
      target/test-classes/com/aliyun/odps/examples/unstructured/TextExtractor$1.class
  70. BIN
      target/test-classes/com/aliyun/odps/examples/unstructured/TextExtractor.class
  71. BIN
      target/test-classes/com/aliyun/odps/examples/unstructured/TextOutputer.class
  72. BIN
      target/test-classes/com/aliyun/odps/examples/unstructured/TextStorageHandler.class
  73. BIN
      target/test-classes/com/aliyun/odps/examples/unstructured/UtteranceLabel.class
  74. BIN
      target/test-classes/com/aliyun/odps/examples/unstructured/test/ExtractorTest.class
  75. BIN
      target/test-classes/com/aliyun/odps/examples/unstructured/test/OutputerTest.class
  76. 6 0
      target/test-classes/data/ambulance_csv/1.csv
  77. 9 0
      target/test-classes/data/ambulance_csv/2.csv
  78. BIN
      target/test-classes/data/speech_wav/tsh148_seg_2_3013_3_6_48_80bd359827e24dd7_0.wav
  79. BIN
      target/test-classes/data/speech_wav/tsh148_seg_3013_1_31_11_9d7c87aef9f3e559_0.wav
  80. BIN
      target/test-classes/data/speech_wav/tsh148_seg_3013_2_29_49_f4cb0990a6b4060c_0.wav
  81. 3 0
      temp/mr_20200824202014_624_15228/input/example_project/wc_in1/__schema__
  82. 4 0
      temp/mr_20200824202014_624_15228/input/example_project/wc_in1/data
  83. 5 0
      temp/mr_20200824202014_624_15228/input/example_project/wc_in2/__schema__
  84. 3 0
      temp/mr_20200824202014_624_15228/input/example_project/wc_in2/p1=2/p2=1/data
  85. 15 0
      temp/mr_20200824202014_624_15228/job.xml
  86. 5 0
      temp/mr_20200824202014_624_15228/output/__default__/R_000000
  87. 3 0
      temp/mr_20200824202014_624_15228/output/__default__/__schema__
  88. 3 0
      warehouse/example_project/__resources__/file_resource.txt
  89. 3 0
      warehouse/example_project/__resources__/kmeans_centers
  90. 10 0
      warehouse/example_project/__resources__/speech_model_random_5_utterance
  91. 1 0
      warehouse/example_project/__resources__/table_resource1/__ref__
  92. 1 0
      warehouse/example_project/__resources__/table_resource2/__ref__
  93. 3 0
      warehouse/example_project/__tables__/ads_log/__schema__
  94. 7 0
      warehouse/example_project/__tables__/ads_log/data
  95. 3 0
      warehouse/example_project/__tables__/kmeans_in/__schema__
  96. 6 0
      warehouse/example_project/__tables__/kmeans_in/data
  97. 3 0
      warehouse/example_project/__tables__/kmeans_out/__schema__
  98. 4 0
      warehouse/example_project/__tables__/kmeans_out/data
  99. 3 0
      warehouse/example_project/__tables__/pagerank_in/__schema__
  100. 0 0
      warehouse/example_project/__tables__/pagerank_in/data

+ 2 - 0
elab_udf.iml

@@ -0,0 +1,2 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module type="JAVA_MODULE" version="4" />

+ 56 - 0
examples/com/aliyun/odps/examples/TestUtil.java

@@ -0,0 +1,56 @@
+package com.aliyun.odps.examples;
+
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.account.Account;
+import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.local.common.WareHouse;
+
+import java.io.File;
+
+public class TestUtil {
+  private final static String accessId = "accessId";
+  private final static String accessKey = "accessKey";
+  private final static String endpoint = "endpoint";
+  private final static String defaultProject = "example_project";
+
+  static Odps odps;
+  static {
+    Account account = new AliyunAccount(accessId, accessKey);
+    odps = new Odps(account);
+    odps.setEndpoint(endpoint);
+    odps.setDefaultProject(defaultProject);
+  }
+
+  public static String join(Object[] obj) {
+    if (obj == null) {
+      return null;
+    }
+    StringBuffer sb = new StringBuffer();
+    for (int i = 0; i < obj.length; i++) {
+      if (sb.length() > 0) {
+        sb.append(",");
+      }
+      sb.append(obj[i]);
+    }
+    return sb.toString();
+  }
+
+  public static Odps getOdps() {
+    return odps;
+  }
+
+  public static WareHouse initWarehouse() {
+    //init the warehouse in project dir
+    File exampleProjectDir = new File("warehouse" + File.separator + defaultProject);
+    if (exampleProjectDir.exists()) {
+      return WareHouse.getInstance("warehouse");
+    } else {
+      exampleProjectDir = new File("../warehouse" + File.separator + defaultProject);
+      if (exampleProjectDir.exists()) {
+        return WareHouse.getInstance("../warehouse");
+      }
+    }
+    throw new RuntimeException("warehouse dir not exists");
+  }
+
+}

+ 259 - 0
examples/com/aliyun/odps/examples/graph/Kmeans.java

@@ -0,0 +1,259 @@
+package com.aliyun.odps.examples.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.aliyun.odps.data.TableInfo;
+import com.aliyun.odps.graph.Aggregator;
+import com.aliyun.odps.graph.ComputeContext;
+import com.aliyun.odps.graph.GraphJob;
+import com.aliyun.odps.graph.GraphLoader;
+import com.aliyun.odps.graph.MutationContext;
+import com.aliyun.odps.graph.Vertex;
+import com.aliyun.odps.graph.WorkerContext;
+import com.aliyun.odps.io.DoubleWritable;
+import com.aliyun.odps.io.LongWritable;
+import com.aliyun.odps.io.NullWritable;
+import com.aliyun.odps.io.Text;
+import com.aliyun.odps.io.Tuple;
+import com.aliyun.odps.io.Writable;
+import com.aliyun.odps.io.WritableRecord;
+
+/**
+ * Set resources arguments:
+ *  kmeans_centers 
+ *  Set program arguments: 
+ *  kmeans_in kmeans_out
+ */
+public class Kmeans {
+  private final static Log LOG = LogFactory.getLog(Kmeans.class);
+
+  public static class KmeansVertex extends Vertex<Text, Tuple, NullWritable, NullWritable> {
+
+    @Override
+    public void compute(ComputeContext<Text, Tuple, NullWritable, NullWritable> context,
+        Iterable<NullWritable> messages) throws IOException {
+      context.aggregate(getValue());
+    }
+
+  }
+
+  public static class KmeansVertexReader extends
+      GraphLoader<Text, Tuple, NullWritable, NullWritable> {
+    @Override
+    public void load(LongWritable recordNum, WritableRecord record,
+        MutationContext<Text, Tuple, NullWritable, NullWritable> context) throws IOException {
+      KmeansVertex vertex = new KmeansVertex();
+      vertex.setId(new Text(String.valueOf(recordNum.get())));
+      vertex.setValue(new Tuple(record.getAll()));
+      context.addVertexRequest(vertex);
+    }
+
+  }
+
+  public static class KmeansAggrValue implements Writable {
+
+    Tuple centers = new Tuple();
+    Tuple sums = new Tuple();
+    Tuple counts = new Tuple();
+
+    public void write(DataOutput out) throws IOException {
+      centers.write(out);
+      sums.write(out);
+      counts.write(out);
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      centers = new Tuple();
+      centers.readFields(in);
+      sums = new Tuple();
+      sums.readFields(in);
+      counts = new Tuple();
+      counts.readFields(in);
+    }
+
+    @Override
+    public String toString() {
+      return "centers " + centers.toString() + ", sums " + sums.toString() + ", counts "
+          + counts.toString();
+    }
+
+  }
+
+  public static class KmeansAggregator extends Aggregator<KmeansAggrValue> {
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public KmeansAggrValue createInitialValue(WorkerContext context) throws IOException {
+      KmeansAggrValue aggrVal = null;
+      if (context.getSuperstep() == 0) {
+        aggrVal = new KmeansAggrValue();
+        aggrVal.centers = new Tuple();
+        aggrVal.sums = new Tuple();
+        aggrVal.counts = new Tuple();
+
+        byte[] centers = context.readCacheFile("kmeans_centers");
+        String lines[] = new String(centers).split("\n");
+
+        for (int i = 0; i < lines.length; i++) {
+          String[] ss = lines[i].split(",");
+          Tuple center = new Tuple();
+          Tuple sum = new Tuple();
+          for (int j = 0; j < ss.length; ++j) {
+            center.append(new DoubleWritable(Double.valueOf(ss[j].trim())));
+            sum.append(new DoubleWritable(0.0));
+          }
+          LongWritable count = new LongWritable(0);
+          aggrVal.sums.append(sum);
+          aggrVal.counts.append(count);
+          aggrVal.centers.append(center);
+        }
+      } else {
+        aggrVal = (KmeansAggrValue) context.getLastAggregatedValue(0);
+      }
+
+      return aggrVal;
+    }
+
+    @Override
+    public void aggregate(KmeansAggrValue value, Object item) {
+      int min = 0;
+      double mindist = Double.MAX_VALUE;
+      Tuple point = (Tuple) item;
+
+      for (int i = 0; i < value.centers.size(); i++) {
+        Tuple center = (Tuple) value.centers.get(i);
+        // use Euclidean Distance, no need to calculate sqrt
+        double dist = 0.0d;
+        for (int j = 0; j < center.size(); j++) {
+          double v = ((DoubleWritable) point.get(j)).get() - ((DoubleWritable) center.get(j)).get();
+          dist += v * v;
+        }
+        if (dist < mindist) {
+          mindist = dist;
+          min = i;
+        }
+      }
+
+      // update sum and count
+      Tuple sum = (Tuple) value.sums.get(min);
+      for (int i = 0; i < point.size(); i++) {
+        DoubleWritable s = (DoubleWritable) sum.get(i);
+        s.set(s.get() + ((DoubleWritable) point.get(i)).get());
+      }
+      LongWritable count = (LongWritable) value.counts.get(min);
+      count.set(count.get() + 1);
+    }
+
+    @Override
+    public void merge(KmeansAggrValue value, KmeansAggrValue partial) {
+      for (int i = 0; i < value.sums.size(); i++) {
+        Tuple sum = (Tuple) value.sums.get(i);
+        Tuple that = (Tuple) partial.sums.get(i);
+
+        for (int j = 0; j < sum.size(); j++) {
+          DoubleWritable s = (DoubleWritable) sum.get(j);
+          s.set(s.get() + ((DoubleWritable) that.get(j)).get());
+        }
+      }
+
+      for (int i = 0; i < value.counts.size(); i++) {
+        LongWritable count = (LongWritable) value.counts.get(i);
+        count.set(count.get() + ((LongWritable) partial.counts.get(i)).get());
+      }
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public boolean terminate(WorkerContext context, KmeansAggrValue value) throws IOException {
+
+      // compute new centers
+      Tuple newCenters = new Tuple(value.sums.size());
+      for (int i = 0; i < value.sums.size(); i++) {
+        Tuple sum = (Tuple) value.sums.get(i);
+        Tuple newCenter = new Tuple(sum.size());
+        LongWritable c = (LongWritable) value.counts.get(i);
+        for (int j = 0; j < sum.size(); j++) {
+
+          DoubleWritable s = (DoubleWritable) sum.get(j);
+          double val = s.get() / c.get();
+          newCenter.set(j, new DoubleWritable(val));
+
+          // reset sum for next iteration
+          s.set(0.0d);
+        }
+        // reset count for next iteration
+        c.set(0);
+        newCenters.set(i, newCenter);
+      }
+
+      // update centers
+      Tuple oldCenters = value.centers;
+      value.centers = newCenters;
+
+      LOG.info("old centers: " + oldCenters + ", new centers: " + newCenters);
+
+      // compare new/old centers
+      boolean converged = true;
+      for (int i = 0; i < value.centers.size() && converged; i++) {
+        Tuple oldCenter = (Tuple) oldCenters.get(i);
+        Tuple newCenter = (Tuple) newCenters.get(i);
+        double sum = 0.0d;
+        for (int j = 0; j < newCenter.size(); j++) {
+          double v =
+              ((DoubleWritable) newCenter.get(j)).get() - ((DoubleWritable) oldCenter.get(j)).get();
+          sum += v * v;
+        }
+        double dist = Math.sqrt(sum);
+        LOG.info("old center: " + oldCenter + ", new center: " + newCenter + ", dist: " + dist);
+        // converge threshold for each center: 0.05
+        converged = dist < 0.05d;
+      }
+
+      if (converged || context.getSuperstep() == context.getMaxIteration() - 1) {
+        // converged or reach max iteration, output centers
+        for (int i = 0; i < value.centers.size(); i++) {
+          context.write(((Tuple) value.centers.get(i)).toArray());
+        }
+        // true means to terminate iteration
+        return true;
+      }
+
+      // false means to continue iteration
+      return false;
+    }
+  }
+
+  private static void printUsage() {
+    System.out.println("Usage: <in> <out> [Max iterations (default 30)]");
+    System.exit(-1);
+  }
+
+  public static void main(String[] args) throws IOException {
+    if (args.length < 2)
+      printUsage();
+
+    GraphJob job = new GraphJob();
+
+    job.setGraphLoaderClass(KmeansVertexReader.class);
+    job.setRuntimePartitioning(false);
+    job.setVertexClass(KmeansVertex.class);
+    job.setAggregatorClass(KmeansAggregator.class);
+    job.addInput(TableInfo.builder().tableName(args[0]).build());
+    job.addOutput(TableInfo.builder().tableName(args[1]).build());
+
+    // default max iteration is 30
+    job.setMaxIteration(30);
+    if (args.length >= 3)
+      job.setMaxIteration(Integer.parseInt(args[2]));
+
+    long start = System.currentTimeMillis();
+    job.run();
+    System.out.println("Job Finished in " + (System.currentTimeMillis() - start) / 1000.0
+        + " seconds");
+  }
+}

+ 107 - 0
examples/com/aliyun/odps/examples/graph/PageRank.java

@@ -0,0 +1,107 @@
+package com.aliyun.odps.examples.graph;
+
+import java.io.IOException;
+
+import com.aliyun.odps.data.TableInfo;
+import com.aliyun.odps.graph.ComputeContext;
+import com.aliyun.odps.graph.GraphJob;
+import com.aliyun.odps.graph.GraphLoader;
+import com.aliyun.odps.graph.MutationContext;
+import com.aliyun.odps.graph.Vertex;
+import com.aliyun.odps.graph.WorkerContext;
+import com.aliyun.odps.io.DoubleWritable;
+import com.aliyun.odps.io.LongWritable;
+import com.aliyun.odps.io.NullWritable;
+import com.aliyun.odps.io.Text;
+import com.aliyun.odps.io.Writable;
+import com.aliyun.odps.io.WritableRecord;
+
+/**
+ * Set program arguments:
+ * pagerank_in pagerank_out
+ * 
+ */
+public class PageRank {
+
+  public static class PageRankVertex extends
+      Vertex<Text, DoubleWritable, NullWritable, DoubleWritable> {
+
+    @Override
+    public void compute(ComputeContext<Text, DoubleWritable, NullWritable, DoubleWritable> context,
+        Iterable<DoubleWritable> messages) throws IOException {
+      if (context.getSuperstep() == 0) {
+        setValue(new DoubleWritable(1.0 / context.getTotalNumVertices()));
+      } else if (context.getSuperstep() >= 1) {
+        double sum = 0;
+        for (DoubleWritable msg : messages) {
+          sum += msg.get();
+        }
+        DoubleWritable vertexValue =
+            new DoubleWritable((0.15f / context.getTotalNumVertices()) + 0.85f * sum);
+        setValue(vertexValue);
+      }
+      if (hasEdges()) {
+        context.sendMessageToNeighbors(this, new DoubleWritable(getValue().get()
+            / getEdges().size()));
+      }
+    }
+
+    @Override
+    public void cleanup(WorkerContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
+        throws IOException {
+      context.write(getId(), getValue());
+    }
+  }
+
+  public static class PageRankVertexReader extends
+      GraphLoader<Text, DoubleWritable, NullWritable, DoubleWritable> {
+
+    @Override
+    public void load(LongWritable recordNum, WritableRecord record,
+        MutationContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
+        throws IOException {
+      PageRankVertex vertex = new PageRankVertex();
+      vertex.setValue(new DoubleWritable(0));
+      vertex.setId((Text) record.get(0));
+      System.out.println(record.get(0));
+
+      for (int i = 1; i < record.size(); i++) {
+        Writable edge = record.get(i);
+        System.out.println(edge.toString());
+        if (!(edge.equals(NullWritable.get()))) {
+          vertex.addEdge(new Text(edge.toString()), NullWritable.get());
+        }
+      }
+      System.out.println("vertex edgs size: " + (vertex.hasEdges() ? vertex.getEdges().size() : 0));
+      context.addVertexRequest(vertex);
+    }
+
+  }
+
+  private static void printUsage() {
+    System.out.println("Usage: <in> <out> [Max iterations (default 30)]");
+    System.exit(-1);
+  }
+
+  public static void main(String[] args) throws IOException {
+    if (args.length < 2)
+      printUsage();
+
+    GraphJob job = new GraphJob();
+
+    job.setGraphLoaderClass(PageRankVertexReader.class);
+    job.setVertexClass(PageRankVertex.class);
+    job.addInput(TableInfo.builder().tableName(args[0]).build());
+    job.addOutput(TableInfo.builder().tableName(args[1]).build());
+
+    // default max iteration is 30
+    job.setMaxIteration(30);
+    if (args.length >= 3)
+      job.setMaxIteration(Integer.parseInt(args[2]));
+
+    long startTime = System.currentTimeMillis();
+    job.run();
+    System.out.println("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0
+        + " seconds");
+  }
+}

+ 130 - 0
examples/com/aliyun/odps/examples/graph/SSSP.java

@@ -0,0 +1,130 @@
+package com.aliyun.odps.examples.graph;
+
+import java.io.IOException;
+
+import com.aliyun.odps.io.WritableRecord;
+import com.aliyun.odps.graph.Combiner;
+import com.aliyun.odps.graph.ComputeContext;
+import com.aliyun.odps.graph.Edge;
+import com.aliyun.odps.graph.GraphJob;
+import com.aliyun.odps.graph.GraphLoader;
+import com.aliyun.odps.graph.MutationContext;
+import com.aliyun.odps.graph.Vertex;
+import com.aliyun.odps.graph.WorkerContext;
+import com.aliyun.odps.io.LongWritable;
+import com.aliyun.odps.data.TableInfo;
+
+/**
+ * Set program arguments: 
+ * 1 sssp_in sssp_out
+ * 
+ */
+public class SSSP {
+
+  public static final String START_VERTEX = "sssp.start.vertex.id";
+
+  public static class SSSPVertex extends
+      Vertex<LongWritable, LongWritable, LongWritable, LongWritable> {
+
+    private static long startVertexId = -1;
+
+    public SSSPVertex() {
+      this.setValue(new LongWritable(Long.MAX_VALUE));
+    }
+
+    public boolean isStartVertex(
+        ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context) {
+      if (startVertexId == -1) {
+        String s = context.getConfiguration().get(START_VERTEX);
+        startVertexId = Long.parseLong(s);
+      }
+      return getId().get() == startVertexId;
+    }
+
+    @Override
+    public void compute(
+        ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,
+        Iterable<LongWritable> messages) throws IOException {
+      long minDist = isStartVertex(context) ? 0 : Integer.MAX_VALUE;
+
+      for (LongWritable msg : messages) {
+        if (msg.get() < minDist) {
+          minDist = msg.get();
+        }
+      }
+
+      if (minDist < this.getValue().get()) {
+        this.setValue(new LongWritable(minDist));
+        if (hasEdges()) {
+          for (Edge<LongWritable, LongWritable> e : this.getEdges()) {
+            context
+                .sendMessage(e.getDestVertexId(), new LongWritable(minDist + e.getValue().get()));
+          }
+        }
+      } else {
+        voteToHalt();
+      }
+    }
+
+    @Override
+    public void cleanup(
+        WorkerContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
+        throws IOException {
+      context.write(getId(), getValue());
+    }
+  }
+
+  public static class MinLongCombiner extends Combiner<LongWritable, LongWritable> {
+
+    @Override
+    public void combine(LongWritable vertexId, LongWritable combinedMessage,
+        LongWritable messageToCombine) throws IOException {
+      if (combinedMessage.get() > messageToCombine.get()) {
+        combinedMessage.set(messageToCombine.get());
+      }
+    }
+
+  }
+
+  public static class SSSPVertexReader extends
+      GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {
+
+    @Override
+    public void load(LongWritable recordNum, WritableRecord record,
+        MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
+        throws IOException {
+      SSSPVertex vertex = new SSSPVertex();
+      vertex.setId((LongWritable) record.get(0));
+      String[] edges = record.get(1).toString().split(";");
+      for (int i = 0; i < edges.length; i++) {
+        String[] ss = edges[i].split(":");
+        vertex.addEdge(new LongWritable(Long.parseLong(ss[0])),
+            new LongWritable(Long.parseLong(ss[1])));
+      }
+
+      context.addVertexRequest(vertex);
+    }
+
+  }
+
+  public static void main(String[] args) throws IOException {
+    if (args.length < 2) {
+      System.out.println("Usage: <startnode> <input> <output>");
+      System.exit(-1);
+    }
+
+    GraphJob job = new GraphJob();
+    job.setGraphLoaderClass(SSSPVertexReader.class);
+    job.setVertexClass(SSSPVertex.class);
+    job.setCombinerClass(MinLongCombiner.class);
+
+    job.set(START_VERTEX, args[0]);
+    job.addInput(TableInfo.builder().tableName(args[1]).build());
+    job.addOutput(TableInfo.builder().tableName(args[2]).build());
+
+    long startTime = System.currentTimeMillis();
+    job.run();
+    System.out.println("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0
+        + " seconds");
+  }
+}

+ 84 - 0
examples/com/aliyun/odps/examples/mr/Resource.java

@@ -0,0 +1,84 @@
+package com.aliyun.odps.examples.mr;
+
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.data.TableInfo;
+import com.aliyun.odps.mapred.JobClient;
+import com.aliyun.odps.mapred.MapperBase;
+import com.aliyun.odps.mapred.conf.JobConf;
+import com.aliyun.odps.mapred.utils.InputUtils;
+import com.aliyun.odps.mapred.utils.OutputUtils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+
+/*
+ * 该示例展示了如何在MapReduce程序中读取文件资源
+ * 该示例主要用于演示Local模式下的调试,如果要将该示例运行于在线环境,
+ * 请将 main方法中的语句 "job.setResources("file_resource.txt");" 删除
+ * 
+ * Usage: 
+ *  Set Resource arguments:
+ *  file_resource.txt 
+ *  Set program arguments:
+ *  wc_in1 rs_out
+ */
+public class Resource {
+
+  public static class TokenizerMapper extends MapperBase {
+    Record result;
+
+    @Override
+    public void setup(TaskContext context) throws IOException {
+      result = context.createOutputRecord();
+      long fileResourceLineCount = 0;
+
+      InputStream in = context.readResourceFileAsStream("file_resource.txt");
+      BufferedReader br = new BufferedReader(new InputStreamReader(in));
+      String line;
+      while ((line = br.readLine()) != null) {
+        fileResourceLineCount++;
+      }
+      br.close();
+
+      result.set(0, "file_resource_line_count");
+      result.set(1, fileResourceLineCount);
+      context.write(result);
+      br.close();
+
+      Iterator<Record> it = context.readResourceTable("table_resource1");
+      long tableResourceRecordCount = 0;
+      while (it.hasNext()) {
+        Record r = it.next();
+        ++tableResourceRecordCount;
+      }
+      result.set(0, "table_resource1_record_count");
+      result.set(1, tableResourceRecordCount);
+      context.write(result);
+
+      it = context.readResourceTable("table_resource2");
+      tableResourceRecordCount = 0;
+      while (it.hasNext()) {
+        Record r = it.next();
+        ++tableResourceRecordCount;
+      }
+      result.set(0, "table_resource2_record_count");
+      result.set(1, tableResourceRecordCount);
+      context.write(result);
+
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    JobConf job = new JobConf();
+    job.setMapperClass(TokenizerMapper.class);
+    job.setNumReduceTasks(0);
+    InputUtils.addTable(TableInfo.builder().tableName("wc_in1").build(), job);
+    OutputUtils.addTable(TableInfo.builder().tableName("rs_out").build(), job);
+
+    JobClient.runJob(job);
+  }
+
+}

+ 124 - 0
examples/com/aliyun/odps/examples/mr/WordCount.java

@@ -0,0 +1,124 @@
+package com.aliyun.odps.examples.mr;
+
+import com.aliyun.odps.counter.Counter;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.data.TableInfo;
+import com.aliyun.odps.mapred.JobClient;
+import com.aliyun.odps.mapred.MapperBase;
+import com.aliyun.odps.mapred.ReducerBase;
+import com.aliyun.odps.mapred.RunningJob;
+import com.aliyun.odps.mapred.conf.JobConf;
+import com.aliyun.odps.mapred.utils.InputUtils;
+import com.aliyun.odps.mapred.utils.OutputUtils;
+import com.aliyun.odps.mapred.utils.SchemaUtils;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/*
+ * 该示例展示了MapReduce程序中的基本结构
+ */
+public class WordCount {
+
+  public static class TokenizerMapper extends MapperBase {
+
+    Record word;
+    Record one;
+    Counter gCnt;
+
+    @Override
+    public void setup(TaskContext context) throws IOException {
+      word = context.createMapOutputKeyRecord();
+      one = context.createMapOutputValueRecord();
+      one.set(new Object[] {1L});
+      gCnt = context.getCounter("MyCounters", "global_counts");
+    }
+
+    @Override
+    public void map(long recordNum, Record record, TaskContext context) throws IOException {
+      for (int i = 0; i < record.getColumnCount(); i++) {
+        String[] words = record.get(i).toString().split("\\s+");
+        for (String w : words) {
+          word.set(new Object[] {w});
+          Counter cnt = context.getCounter("MyCounters", "map_outputs");
+          cnt.increment(1);
+          gCnt.increment(1);
+          context.write(word, one);
+        }
+      }
+    }
+  }
+
+  /**
+   * A combiner class that combines map output by sum them.
+   */
+  public static class SumCombiner extends ReducerBase {
+    private Record count;
+
+    @Override
+    public void setup(TaskContext context) throws IOException {
+      count = context.createMapOutputValueRecord();
+    }
+
+    @Override
+    public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
+      long c = 0;
+      while (values.hasNext()) {
+        Record val = values.next();
+        c += (Long) val.get(0);
+      }
+      count.set(0, c);
+      context.write(key, count);
+    }
+  }
+
+  /**
+   * A reducer class that just emits the sum of the input values.
+   */
+  public static class SumReducer extends ReducerBase {
+    private Record result;
+    Counter gCnt;
+
+    @Override
+    public void setup(TaskContext context) throws IOException {
+      result = context.createOutputRecord();
+      gCnt = context.getCounter("MyCounters", "global_counts");
+    }
+
+    @Override
+    public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
+      long count = 0;
+      while (values.hasNext()) {
+        Record val = values.next();
+        count += (Long) val.get(0);
+      }
+      result.set(0, key.get(0));
+      result.set(1, count);
+      Counter cnt = context.getCounter("MyCounters", "reduce_outputs");
+      cnt.increment(1);
+      gCnt.increment(1);
+
+      context.write(result);
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+
+    JobConf job = new JobConf();
+    job.setMapperClass(TokenizerMapper.class);
+    job.setCombinerClass(SumCombiner.class);
+    job.setReducerClass(SumReducer.class);
+
+    job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
+    job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
+
+    InputUtils.addTable(TableInfo.builder().tableName("wc_in1").cols(new String[] {"col2", "col3"})
+        .build(), job);
+    InputUtils.addTable(TableInfo.builder().tableName("wc_in2").partSpec("p1=2/p2=1").build(), job);
+    OutputUtils.addTable(TableInfo.builder().tableName("wc_out").build(), job);
+
+    RunningJob rj = JobClient.runJob(job);
+    rj.waitForCompletion();
+  }
+
+}

+ 106 - 0
examples/com/aliyun/odps/examples/mr/test/WordCountTest.java

@@ -0,0 +1,106 @@
+package com.aliyun.odps.examples.mr.test;
+
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.data.TableInfo;
+import com.aliyun.odps.examples.TestUtil;
+import com.aliyun.odps.examples.mr.WordCount;
+import com.aliyun.odps.io.Text;
+import com.aliyun.odps.mapred.conf.JobConf;
+import com.aliyun.odps.mapred.unittest.*;
+import com.aliyun.odps.mapred.utils.InputUtils;
+import com.aliyun.odps.mapred.utils.OutputUtils;
+import com.aliyun.odps.mapred.utils.SchemaUtils;
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+public class WordCountTest extends MRUnitTest {
+  // 定义输入输出表的 schema
+  private final static String INPUT_SCHEMA = "a:string,b:string";
+  private final static String OUTPUT_SCHEMA = "k:string,v:bigint";
+  private JobConf job;
+
+  public WordCountTest() throws Exception {
+    TestUtil.initWarehouse();
+    // 准备作业配置
+    job = new JobConf();
+
+    job.setMapperClass(WordCount.TokenizerMapper.class);
+    job.setCombinerClass(WordCount.SumCombiner.class);
+    job.setReducerClass(WordCount.SumReducer.class);
+
+    job.setMapOutputKeySchema(SchemaUtils.fromString("key:string"));
+    job.setMapOutputValueSchema(SchemaUtils.fromString("value:bigint"));
+
+    InputUtils.addTable(TableInfo.builder().tableName("wc_in").build(), job);
+    OutputUtils.addTable(TableInfo.builder().tableName("wc_out").build(), job);
+  }
+
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testMap() throws IOException, ClassNotFoundException, InterruptedException {
+    MapUTContext mapContext = new MapUTContext();
+    mapContext.setInputSchema(INPUT_SCHEMA);
+    mapContext.setOutputSchema(OUTPUT_SCHEMA, job);
+    // 准备测试数据
+    Record record = mapContext.createInputRecord();
+    record.set(new Text[] {new Text("hello"), new Text("c")});
+    mapContext.addInputRecord(record);
+
+    record = mapContext.createInputRecord();
+    record.set(new Text[] {new Text("hello"), new Text("java")});
+    mapContext.addInputRecord(record);
+    // 运行 map 过程
+    TaskOutput output = runMapper(job, mapContext);
+
+    // 验证 map 的结果(执行了combine),为 3 组 key/value 对
+    List<KeyValue<Record, Record>> kvs = output.getOutputKeyValues();
+    Assert.assertEquals(3, kvs.size());
+    Assert.assertEquals(new KeyValue<String, Long>(new String("c"), new Long(1)),
+        new KeyValue<String, Long>((String) (kvs.get(0).getKey().get(0)), (Long) (kvs.get(0)
+            .getValue().get(0))));
+    Assert.assertEquals(new KeyValue<String, Long>(new String("hello"), new Long(2)),
+        new KeyValue<String, Long>((String) (kvs.get(1).getKey().get(0)), (Long) (kvs.get(1)
+            .getValue().get(0))));
+    Assert.assertEquals(new KeyValue<String, Long>(new String("java"), new Long(1)),
+        new KeyValue<String, Long>((String) (kvs.get(2).getKey().get(0)), (Long) (kvs.get(2)
+            .getValue().get(0))));
+  }
+
+  @Test
+  public void testReduce() throws IOException, ClassNotFoundException, InterruptedException {
+    ReduceUTContext context = new ReduceUTContext();
+    context.setOutputSchema(OUTPUT_SCHEMA,  job);
+    // 准备测试数据
+    Record key = context.createInputKeyRecord(job);
+    Record value = context.createInputValueRecord(job);
+    key.set(0, "world");
+    value.set(0, new Long(1));
+    context.addInputKeyValue(key, value);
+    key.set(0, "hello");
+    value.set(0, new Long(1));
+    context.addInputKeyValue(key, value);
+    key.set(0, "hello");
+    value.set(0, new Long(1));
+    context.addInputKeyValue(key, value);
+    key.set(0, "odps");
+    value.set(0, new Long(1));
+    context.addInputKeyValue(key, value);
+
+    // 运行 reduce 过程
+    TaskOutput output = runReducer(job, context);
+
+    // 验证 reduce 结果,为 3 条 record
+    List<Record> records = output.getOutputRecords();
+    Assert.assertEquals(3, records.size());
+    Assert.assertEquals(new String("hello"), records.get(0).get("k"));
+    Assert.assertEquals(new Long(2), records.get(0).get("v"));
+    Assert.assertEquals(new String("odps"), records.get(1).get("k"));
+    Assert.assertEquals(new Long(1), records.get(1).get("v"));
+    Assert.assertEquals(new String("world"), records.get(2).get("k"));
+    Assert.assertEquals(new Long(1), records.get(2).get("v"));
+  }
+
+}

+ 47 - 0
examples/com/aliyun/odps/examples/udf/UDAFExample.java

@@ -0,0 +1,47 @@
+package com.aliyun.odps.examples.udf;
+
+import com.aliyun.odps.io.LongWritable;
+import com.aliyun.odps.io.Text;
+import com.aliyun.odps.io.Writable;
+import com.aliyun.odps.udf.Aggregator;
+import com.aliyun.odps.udf.UDFException;
+import com.aliyun.odps.udf.annotation.Resolve;
+
+/**
+ * project: example_project 
+ * table: wc_in2 
+ * partitions: p2=1,p1=2 
+ * columns: colc,colb,cola
+ */
+@Resolve("string->bigint")
+public class UDAFExample extends Aggregator {
+
+  @Override
+  public void iterate(Writable buffer, Writable[] args) throws UDFException {
+    LongWritable result = (LongWritable) buffer;
+    for (Writable item : args) {
+      Text txt = (Text) item;
+      result.set(result.get() + txt.getLength());
+    }
+
+  }
+
+  @Override
+  public void merge(Writable buffer, Writable partial) throws UDFException {
+    LongWritable result = (LongWritable) buffer;
+    LongWritable partialResult = (LongWritable) partial;
+    result.set(result.get() + partialResult.get());
+
+  }
+
+  @Override
+  public Writable newBuffer() {
+    return new LongWritable(0L);
+  }
+
+  @Override
+  public Writable terminate(Writable buffer) throws UDFException {
+    return buffer;
+  }
+
+}

+ 93 - 0
examples/com/aliyun/odps/examples/udf/UDAFResource.java

@@ -0,0 +1,93 @@
+package com.aliyun.odps.examples.udf;
+
+import com.aliyun.odps.io.LongWritable;
+import com.aliyun.odps.io.Text;
+import com.aliyun.odps.io.Writable;
+import com.aliyun.odps.udf.Aggregator;
+import com.aliyun.odps.udf.ExecutionContext;
+import com.aliyun.odps.udf.UDFException;
+import com.aliyun.odps.udf.annotation.Resolve;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+
+/**
+ * project: example_project 
+ * table: wc_in2 
+ * partitions: p2=1,p1=2 
+ * columns: colc,colb,cola
+ */
+@Resolve("string->bigint")
+public class UDAFResource extends Aggregator {
+  ExecutionContext ctx;
+  long fileResourceLineCount;
+  long tableResource1RecordCount;
+  long tableResource2RecordCount;
+
+  @Override
+  public void setup(ExecutionContext ctx) throws UDFException {
+    this.ctx = ctx;
+    try {
+      InputStream in = ctx.readResourceFileAsStream("file_resource.txt");
+      BufferedReader br = new BufferedReader(new InputStreamReader(in));
+      fileResourceLineCount = 0;
+      String line;
+      while ((line = br.readLine()) != null) {
+        fileResourceLineCount++;
+      }
+      br.close();
+
+      Iterator<Object[]> iterator = ctx.readResourceTable("table_resource1").iterator();
+      tableResource1RecordCount = 0;
+      while (iterator.hasNext()) {
+        tableResource1RecordCount++;
+        iterator.next();
+      }
+
+      iterator = ctx.readResourceTable("table_resource2").iterator();
+      tableResource2RecordCount = 0;
+      while (iterator.hasNext()) {
+        tableResource2RecordCount++;
+        iterator.next();
+      }
+
+    } catch (IOException e) {
+      throw new UDFException(e);
+    }
+  }
+
+  @Override
+  public void iterate(Writable arg0, Writable[] arg1) throws UDFException {
+    LongWritable result = (LongWritable) arg0;
+    for (Writable item : arg1) {
+      Text txt = (Text) item;
+      result.set(result.get() + txt.getLength());
+    }
+
+  }
+
+  @Override
+  public void merge(Writable arg0, Writable arg1) throws UDFException {
+    LongWritable result = (LongWritable) arg0;
+    LongWritable partial = (LongWritable) arg1;
+    result.set(result.get() + partial.get());
+
+  }
+
+  @Override
+  public Writable newBuffer() {
+    return new LongWritable(0L);
+  }
+
+  @Override
+  public Writable terminate(Writable arg0) throws UDFException {
+    LongWritable result = (LongWritable) arg0;
+    result.set(result.get() + fileResourceLineCount + tableResource1RecordCount
+        + tableResource2RecordCount);
+    return result;
+  }
+
+}

+ 35 - 0
examples/com/aliyun/odps/examples/udf/UDFExample.java

@@ -0,0 +1,35 @@
+package com.aliyun.odps.examples.udf;
+
+import com.aliyun.odps.udf.UDF;
+
+public class UDFExample extends UDF {
+
+  /**
+   * project: example_project
+   * table: wc_in1
+   * columns: col1
+   */
+  public String evaluate(String a) {
+    return "s2s:" + a;
+  }
+
+  /**
+   * project: example_project 
+   * table: wc_in1 
+   * columns: col1,col2
+   */
+  public String evaluate(String a, String b) {
+    return "ss2s:" + a + "," + b;
+  }
+
+  /**
+   * project: example_project 
+   * table: wc_in2 
+   * partitions: p2=1,p1=2 
+   * columns: colc,colb,cola
+   */
+  public String evaluate(String a, String b, String c) {
+    return "sss2s:" + a + "," + b + "," + c;
+  }
+
+}

+ 60 - 0
examples/com/aliyun/odps/examples/udf/UDFResource.java

@@ -0,0 +1,60 @@
+package com.aliyun.odps.examples.udf;
+
+import com.aliyun.odps.udf.ExecutionContext;
+import com.aliyun.odps.udf.UDF;
+import com.aliyun.odps.udf.UDFException;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+
+public class UDFResource extends UDF {
+  ExecutionContext ctx;
+  long fileResourceLineCount;
+  long tableResource1RecordCount;
+  long tableResource2RecordCount;
+
+  @Override
+  public void setup(ExecutionContext ctx) throws UDFException {
+    this.ctx = ctx;
+    try {
+      InputStream in = ctx.readResourceFileAsStream("file_resource.txt");
+      BufferedReader br = new BufferedReader(new InputStreamReader(in));
+      String line;
+      fileResourceLineCount = 0;
+      while ((line = br.readLine()) != null) {
+        fileResourceLineCount++;
+      }
+      br.close();
+
+      Iterator<Object[]> iterator = ctx.readResourceTable("table_resource1").iterator();
+      tableResource1RecordCount = 0;
+      while (iterator.hasNext()) {
+        tableResource1RecordCount++;
+        iterator.next();
+      }
+
+      iterator = ctx.readResourceTable("table_resource2").iterator();
+      tableResource2RecordCount = 0;
+      while (iterator.hasNext()) {
+        tableResource2RecordCount++;
+        iterator.next();
+      }
+
+    } catch (IOException e) {
+      throw new UDFException(e);
+    }
+  }
+
+  /**
+   * project: example_project table: wc_in2 partitions: p2=1,p1=2 columns: colc,colb
+   */
+  public String evaluate(String a, String b) {
+    return "ss2s:" + a + "," + b + "|fileResourceLineCount=" + fileResourceLineCount
+        + "|tableResource1RecordCount=" + tableResource1RecordCount + "|tableResource2RecordCount="
+        + tableResource2RecordCount;
+  }
+
+}

+ 24 - 0
examples/com/aliyun/odps/examples/udf/UDTFExample.java

@@ -0,0 +1,24 @@
+package com.aliyun.odps.examples.udf;
+
+import com.aliyun.odps.udf.UDFException;
+import com.aliyun.odps.udf.UDTF;
+import com.aliyun.odps.udf.annotation.Resolve;
+
+/**
+ * project: example_project 
+ * table: wc_in2 
+ * partitions: p2=1,p1=2 
+ * columns: colc,colb
+ */
+@Resolve({"string,string->string,bigint"})
+public class UDTFExample extends UDTF {
+
+  @Override
+  public void process(Object[] args) throws UDFException {
+    String a = (String) args[0];
+    long b = args[1] == null ? 0 : ((String) args[1]).length();
+
+    forward(a, b);
+
+  }
+}

+ 68 - 0
examples/com/aliyun/odps/examples/udf/UDTFResource.java

@@ -0,0 +1,68 @@
+package com.aliyun.odps.examples.udf;
+
+import com.aliyun.odps.udf.ExecutionContext;
+import com.aliyun.odps.udf.UDFException;
+import com.aliyun.odps.udf.UDTF;
+import com.aliyun.odps.udf.annotation.Resolve;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+
+/**
+ * project: example_project 
+ * table: wc_in2 
+ * partitions: p2=1,p1=2 
+ * columns: colc,colb
+ */
+@Resolve({"string,string->string,bigint,string"})
+public class UDTFResource extends UDTF {
+  ExecutionContext ctx;
+  long fileResourceLineCount;
+  long tableResource1RecordCount;
+  long tableResource2RecordCount;
+
+  @Override
+  public void setup(ExecutionContext ctx) throws UDFException {
+    this.ctx = ctx;
+    try {
+      InputStream in = ctx.readResourceFileAsStream("file_resource.txt");
+      BufferedReader br = new BufferedReader(new InputStreamReader(in));
+      String line;
+      fileResourceLineCount = 0;
+      while ((line = br.readLine()) != null) {
+        fileResourceLineCount++;
+      }
+      br.close();
+
+      Iterator<Object[]> iterator = ctx.readResourceTable("table_resource1").iterator();
+      tableResource1RecordCount = 0;
+      while (iterator.hasNext()) {
+        tableResource1RecordCount++;
+        iterator.next();
+      }
+
+      iterator = ctx.readResourceTable("table_resource2").iterator();
+      tableResource2RecordCount = 0;
+      while (iterator.hasNext()) {
+        tableResource2RecordCount++;
+        iterator.next();
+      }
+
+    } catch (IOException e) {
+      throw new UDFException(e);
+    }
+  }
+
+  @Override
+  public void process(Object[] args) throws UDFException {
+    String a = (String) args[0];
+    long b = args[1] == null ? 0 : ((String) args[1]).length();
+
+    forward(a, b, "fileResourceLineCount=" + fileResourceLineCount + "|tableResource1RecordCount="
+        + tableResource1RecordCount + "|tableResource2RecordCount=" + tableResource2RecordCount);
+
+  }
+}

+ 63 - 0
examples/com/aliyun/odps/examples/udf/test/UDAFTest.java

@@ -0,0 +1,63 @@
+package com.aliyun.odps.examples.udf.test;
+
+import com.aliyun.odps.examples.TestUtil;
+import com.aliyun.odps.udf.local.datasource.InputSource;
+import com.aliyun.odps.udf.local.datasource.TableInputSource;
+import com.aliyun.odps.udf.local.runner.AggregatorRunner;
+import com.aliyun.odps.udf.local.runner.BaseRunner;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+
+public class UDAFTest {
+
+  @BeforeClass
+  public static void initWarehouse() {
+    TestUtil.initWarehouse();
+  }
+
+  @Test
+  public void simpleInput() throws Exception{
+    BaseRunner runner = new AggregatorRunner(null,
+        "com.aliyun.odps.examples.udf.UDAFExample");
+    runner.feed(new Object[] { "one", "one" }).feed(new Object[] { "three", "three" })
+        .feed(new Object[] { "four", "four" });
+    List<Object[]> out = runner.yield();
+    Assert.assertEquals(1, out.size());
+    Assert.assertEquals(24L, out.get(0)[0]);
+  }
+
+  @Test
+  public void inputFromTable() throws Exception{
+    BaseRunner runner = new AggregatorRunner(TestUtil.getOdps(),
+        "com.aliyun.odps.examples.udf.UDAFExample");
+    // partition table
+    String project = "example_project";
+    String table = "wc_in2";
+    String[] partitions = new String[] { "p2=1", "p1=2" };
+    String[] columns = new String[] { "colc", "cola" };
+    InputSource inputSource = new TableInputSource(project, table, partitions, columns);
+    Object[] data;
+    while ((data = inputSource.getNextRow()) != null) {
+      runner.feed(data);
+    }
+    List<Object[]> out = runner.yield();
+    Assert.assertEquals(1, out.size());
+    Assert.assertEquals(36L, out.get(0)[0]);
+  }
+
+  @Test
+  public void resourceTest() throws Exception{
+    BaseRunner runner = new AggregatorRunner(TestUtil.getOdps(),
+        "com.aliyun.odps.examples.udf.UDAFResource");
+    runner.feed(new Object[] { "one", "one" }).feed(new Object[] { "three", "three" })
+        .feed(new Object[] { "four", "four" });
+    List<Object[]> out = runner.yield();
+    Assert.assertEquals(1, out.size());
+    // 24+3+4+4
+    Assert.assertEquals(35L, out.get(0)[0]);
+  }
+
+}

+ 69 - 0
examples/com/aliyun/odps/examples/udf/test/UDFTest.java

@@ -0,0 +1,69 @@
+package com.aliyun.odps.examples.udf.test;
+
+import com.aliyun.odps.examples.TestUtil;
+import com.aliyun.odps.udf.local.datasource.InputSource;
+import com.aliyun.odps.udf.local.datasource.TableInputSource;
+import com.aliyun.odps.udf.local.runner.BaseRunner;
+import com.aliyun.odps.udf.local.runner.UDFRunner;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+
+public class UDFTest {
+
+  @BeforeClass
+  public static void initWarehouse() {
+    TestUtil.initWarehouse();
+  }
+
+  @Test
+  public void simpleInput() throws Exception{
+    BaseRunner runner = new UDFRunner(null, "com.aliyun.odps.examples.udf.UDFExample");
+    runner.feed(new Object[] { "one", "one" }).feed(new Object[] { "three", "three" })
+        .feed(new Object[] { "four", "four" });
+    List<Object[]> out = runner.yield();
+
+    Assert.assertEquals(3, out.size());
+    Assert.assertEquals("ss2s:one,one", TestUtil.join(out.get(0)));
+    Assert.assertEquals("ss2s:three,three", TestUtil.join(out.get(1)));
+    Assert.assertEquals("ss2s:four,four", TestUtil.join(out.get(2)));
+  }
+
+  @Test
+  public void inputFromTable() throws Exception{
+    BaseRunner runner = new UDFRunner(TestUtil.getOdps(), "com.aliyun.odps.examples.udf.UDFExample");
+    String project = "example_project";
+    String table = "wc_in2";
+    String[] partitions = new String[] { "p2=1", "p1=2" };
+    String[] columns = new String[] { "colc", "cola" };
+    InputSource inputSource = new TableInputSource(project, table, partitions, columns);
+    Object[] data;
+    while ((data = inputSource.getNextRow()) != null) {
+      runner.feed(data);
+    }
+    List<Object[]> out = runner.yield();
+    Assert.assertEquals(3, out.size());
+    Assert.assertEquals("ss2s:three3,three1", TestUtil.join(out.get(0)));
+    Assert.assertEquals("ss2s:three3,three1", TestUtil.join(out.get(1)));
+    Assert.assertEquals("ss2s:three3,three1", TestUtil.join(out.get(2)));
+  }
+
+  @Test
+  public void resourceTest() throws Exception{
+    BaseRunner runner = new UDFRunner(TestUtil.getOdps(), "com.aliyun.odps.examples.udf.UDFResource");
+    runner.feed(new Object[] { "one", "one" }).feed(new Object[] { "three", "three" })
+        .feed(new Object[] { "four", "four" });
+    List<Object[]> out = runner.yield();
+
+    Assert.assertEquals(3, out.size());
+    Assert.assertEquals("ss2s:one,one|fileResourceLineCount=3|tableResource1RecordCount=4|tableResource2RecordCount=4",
+        TestUtil.join(out.get(0)));
+    Assert.assertEquals("ss2s:three,three|fileResourceLineCount=3|tableResource1RecordCount=4|tableResource2RecordCount=4",
+        TestUtil.join(out.get(1)));
+    Assert.assertEquals("ss2s:four,four|fileResourceLineCount=3|tableResource1RecordCount=4|tableResource2RecordCount=4",
+        TestUtil.join(out.get(2)));
+  }
+
+}

+ 68 - 0
examples/com/aliyun/odps/examples/udf/test/UDTFTest.java

@@ -0,0 +1,68 @@
+package com.aliyun.odps.examples.udf.test;
+
+import com.aliyun.odps.examples.TestUtil;
+import com.aliyun.odps.udf.local.datasource.InputSource;
+import com.aliyun.odps.udf.local.datasource.TableInputSource;
+import com.aliyun.odps.udf.local.runner.BaseRunner;
+import com.aliyun.odps.udf.local.runner.UDTFRunner;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+
+public class UDTFTest {
+
+  @BeforeClass
+  public static void initWarehouse() {
+    TestUtil.initWarehouse();
+  }
+
+  @Test
+  public void simpleInput() throws Exception{
+    BaseRunner runner = new UDTFRunner(null, "com.aliyun.odps.examples.udf.UDTFExample");
+    runner.feed(new Object[] { "one", "one" }).feed(new Object[] { "three", "three" })
+        .feed(new Object[] { "four", "four" });
+    List<Object[]> out = runner.yield();
+    Assert.assertEquals(3, out.size());
+    Assert.assertEquals("one,3", TestUtil.join(out.get(0)));
+    Assert.assertEquals("three,5", TestUtil.join(out.get(1)));
+    Assert.assertEquals("four,4", TestUtil.join(out.get(2)));
+  }
+
+  @Test
+  public void inputFromTable() throws Exception{
+    BaseRunner runner = new UDTFRunner(TestUtil.getOdps(), "com.aliyun.odps.examples.udf.UDTFExample");
+    String project = "example_project";
+    String table = "wc_in2";
+    String[] partitions = new String[] { "p2=1", "p1=2" };
+    String[] columns = new String[] { "colc", "cola" };
+
+    InputSource inputSource = new TableInputSource(project, table, partitions, columns);
+    Object[] data;
+    while ((data = inputSource.getNextRow()) != null) {
+      runner.feed(data);
+    }
+    List<Object[]> out = runner.yield();
+    Assert.assertEquals(3, out.size());
+    Assert.assertEquals("three3,6", TestUtil.join(out.get(0)));
+    Assert.assertEquals("three3,6", TestUtil.join(out.get(1)));
+    Assert.assertEquals("three3,6", TestUtil.join(out.get(2)));
+  }
+
+  @Test
+  public void resourceTest() throws Exception{
+    BaseRunner runner = new UDTFRunner(TestUtil.getOdps(), "com.aliyun.odps.examples.udf.UDTFResource");
+    runner.feed(new Object[] { "one", "one" }).feed(new Object[] { "three", "three" })
+        .feed(new Object[] { "four", "four" });
+    List<Object[]> out = runner.yield();
+    Assert.assertEquals(3 + "", out.size() + "");
+    Assert.assertEquals("one,3,fileResourceLineCount=3|tableResource1RecordCount=4|tableResource2RecordCount=4",
+        TestUtil.join(out.get(0)));
+    Assert.assertEquals("three,5,fileResourceLineCount=3|tableResource1RecordCount=4|tableResource2RecordCount=4",
+        TestUtil.join(out.get(1)));
+    Assert.assertEquals("four,4,fileResourceLineCount=3|tableResource1RecordCount=4|tableResource2RecordCount=4",
+        TestUtil.join(out.get(2)));
+  }
+
+}

+ 90 - 0
examples/com/aliyun/odps/examples/udj/PayUserLogMergeJoin.java

@@ -0,0 +1,90 @@
+package com.aliyun.odps.examples.udj;
+
+import com.aliyun.odps.Column;
+import com.aliyun.odps.OdpsType;
+import com.aliyun.odps.Yieldable;
+import com.aliyun.odps.data.ArrayRecord;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.udf.DataAttributes;
+import com.aliyun.odps.udf.ExecutionContext;
+import com.aliyun.odps.udf.UDJ;
+import com.aliyun.odps.udf.annotation.Resolve;
+import java.util.ArrayList;
+import java.util.Iterator;
+/** For each record of right table, find the nearest record of left table and
+ * merge two records.
+ */
+@Resolve("->string,bigint,string")
+public class PayUserLogMergeJoin extends UDJ {
+  private Record outputRecord;
+  /** Will be called prior to the data processing phase. User could implement
+   * this method to do initialization work.
+   */
+  @Override
+  public void setup(ExecutionContext executionContext, DataAttributes dataAttributes) {
+    //
+    outputRecord = new ArrayRecord(new Column[]{
+        new Column("user_id", OdpsType.STRING),
+        new Column("time", OdpsType.BIGINT),
+        new Column("content", OdpsType.STRING)
+    });
+  }
+
+  /** Override this method to implement join logic.
+   * @param key Current join key
+   * @param left Group of records of left table corresponding to the current key
+   * @param right Group of records of right table corresponding to the current key
+   * @param output Used to output the result of UDJ
+   */
+  @Override
+  public void join(Record key, Iterator<Record> left, Iterator<Record> right, Yieldable<Record> output) {
+    outputRecord.setString(0, key.getString(0));
+    if (!right.hasNext()) {
+      // Empty right group, do nothing.
+      return;
+    } else if (!left.hasNext()) {
+      // Empty left group. Output all records of right group without merge.
+      while (right.hasNext()) {
+        Record logRecord = right.next();
+        outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());
+        outputRecord.setString(2, logRecord.getString(1));
+        output.yield(outputRecord);
+      }
+      return;
+    }
+    ArrayList<Record> pays = new ArrayList<>();
+    // The left group of records will be iterated from the start to the end
+    // for each record of right group, but the iterator cannot be reset.
+    // So we save every records of left to an ArrayList.
+    left.forEachRemaining(pay -> pays.add(pay.clone()));
+    while (right.hasNext()) {
+      Record log = right.next();
+      long logTime = log.getDatetime(0).getTime();
+      long minDelta = Long.MAX_VALUE;
+      Record nearestPay = null;
+      // Iterate through all records of left, and find the pay record that has
+      // the minimal difference in terms of time.
+      for (Record pay: pays) {
+        long delta = Math.abs(logTime - pay.getDatetime(0).getTime());
+        if (delta < minDelta) {
+          minDelta = delta;
+          nearestPay = pay;
+        }
+      }
+      // Merge the log record with nearest pay record and output to the result.
+      outputRecord.setBigint(1, log.getDatetime(0).getTime());
+      outputRecord.setString(2, mergeLog(nearestPay.getString(1), log.getString(1)));
+      output.yield(outputRecord);
+    }
+  }
+
+  String mergeLog(String payInfo, String logContent) {
+    return logContent + ", pay " + payInfo;
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+}

+ 240 - 0
examples/com/aliyun/odps/examples/unstructured/SpeechSentenceSnrExtractor.java

@@ -0,0 +1,240 @@
+package com.aliyun.odps.examples.unstructured;
+
+import com.aliyun.odps.Column;
+import com.aliyun.odps.OdpsType;
+import com.aliyun.odps.data.ArrayRecord;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.io.InputStreamSet;
+import com.aliyun.odps.io.SourceInputStream;
+import com.aliyun.odps.udf.DataAttributes;
+import com.aliyun.odps.udf.ExecutionContext;
+import com.aliyun.odps.udf.Extractor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.security.InvalidParameterException;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+public class SpeechSentenceSnrExtractor extends Extractor {
+  private final static Log logger = LogFactory.getLog(SpeechSentenceSnrExtractor.class);
+
+  private static final String MLF_FILE_ATTRIBUTE_KEY = "mlfFileName";
+  private static final String SPEECH_SAMPLE_RATE_KEY = "speechSampleRateInKHz";
+
+  private String mlfFileName;
+  private HashMap<String, UtteranceLabel> utteranceLabels;
+  private InputStreamSet inputs;
+  private DataAttributes attributes;
+  private double sampleRateInKHz;
+
+  public SpeechSentenceSnrExtractor(){
+    this.utteranceLabels = new HashMap<String, UtteranceLabel>();
+  }
+
+  @Override
+  public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes){
+    this.inputs = inputs;
+    this.attributes = attributes;
+    this.mlfFileName = this.attributes.getValueByKey(MLF_FILE_ATTRIBUTE_KEY);
+    if (this.mlfFileName == null){
+      throw new IllegalArgumentException("A mlf file must be specified in extractor attribute.");
+    }
+    String sampleRateInKHzStr = this.attributes.getValueByKey(SPEECH_SAMPLE_RATE_KEY);
+    if (sampleRateInKHzStr == null){
+      throw new IllegalArgumentException("The speech sampling rate must be specified in extractor attribute.");
+    }
+    this.sampleRateInKHz = Double.parseDouble(sampleRateInKHzStr);
+    try {
+      BufferedInputStream inputStream = ctx.readResourceFileAsStream(mlfFileName);
+      loadMlfLabelsFromResource(inputStream);
+      inputStream.close();
+    } catch (IOException e) {
+      throw new RuntimeException("reading model from mlf failed with exception " + e.getMessage());
+    }
+  }
+
+  @Override
+  public Record extract() throws IOException {
+    SourceInputStream inputStream = inputs.next();
+    if (inputStream == null){
+      return null;
+    }
+
+    String fileName = inputStream.getFileName();
+    fileName = fileName.substring(fileName.lastIndexOf('/') + 1);
+    logger.info("Processing wav file " + fileName);
+    // full file path: path/to/XXX.wav => XXX as id
+    String id = fileName.substring(0, fileName.lastIndexOf('.'));
+
+    long fileSize = inputStream.getFileSize();
+    if (fileSize > Integer.MAX_VALUE){
+      // technically a larger file can be read via multiple batches,
+      // but we simply do not support it in this example.
+      throw new IllegalArgumentException("Do not support speech file larger than 2G bytes");
+    }
+    byte[] buffer = new byte[(int)fileSize];
+
+    Column[] outputColumns = this.attributes.getRecordColumns();
+    ArrayRecord record = new ArrayRecord(outputColumns);
+    if (outputColumns.length != 2 || outputColumns[0].getType() != OdpsType.DOUBLE
+        || outputColumns[1].getType() != OdpsType.STRING){
+      throw new IllegalArgumentException("Expecting output to of schema double|string.");
+    }
+    int readSize = inputStream.readToEnd(buffer);
+    inputStream.close();
+    double snr = computeSnr(id, buffer, readSize);
+    record.setDouble(0, snr);
+    record.setString(1, id);
+    logger.info(String.format("file [%s] snr computed to be [%f]db", fileName, snr));
+    return record;
+  }
+
+  @Override
+  public void close(){
+    //no-op
+  }
+
+  private void loadMlfLabelsFromResource(BufferedInputStream fileInputStream)
+      throws IOException {
+    BufferedReader br = new BufferedReader(new InputStreamReader(fileInputStream));
+    String line;
+    String id = "";
+    // here we relies on the particular format of the mlf to load labels from the file
+    while ((line = br.readLine()) != null) {
+      if (line.trim().isEmpty()){
+        continue;
+      }
+      if (line.startsWith("id:")){
+        id = line.split(":")[1].trim();
+      }
+      else{
+        // in this branch, line must be the label
+        this.utteranceLabels.put(id, new UtteranceLabel(id, line, " "));
+      }
+    }
+  }
+
+  // compute the snr of the speech sentence, assuming the input buffer contains the entire content of a wav file
+  private double computeSnr(String id, byte[] buffer, int validBufferLen){
+    final int headerLength = 44;
+    if (validBufferLen < headerLength){
+      throw new IllegalArgumentException("A wav buffer must be at least larger than standard wav header size.");
+    }
+    // each frame is 10 ms
+    int sampleCountPerFrame = (int)this.sampleRateInKHz * 10;
+    // each data point denoted by a short integer (2 bytes)
+    int dataLen = (validBufferLen - headerLength) / 2;
+
+    if (dataLen % sampleCountPerFrame != 0){
+      throw new IllegalArgumentException(
+          String.format("Invalid wav file where dataLen %d does not divide sampleCountPerFrame %d",
+              dataLen, sampleCountPerFrame));
+    }
+    // total number of frames in the wav file
+    int frameCount = dataLen / sampleCountPerFrame;
+
+    UtteranceLabel utteranceLabel = this.utteranceLabels.get(id);
+    if (utteranceLabel == null){
+      throw new IllegalArgumentException(String.format("Cannot find label of id %s from MLF.", id));
+    }
+    ArrayList<Long> labels = utteranceLabel.getLabels();
+    // usually frameCount should be larger than labels.size() by a small margin
+    // in our sample data, this margin is 2.
+    if (labels.size()  + 2 != frameCount){
+      throw new IllegalArgumentException(String.format("Mismatched frame labels size % d and frameCount %d.",
+          labels.size() + 2, frameCount ));
+    }
+    int offset = headerLength;
+    short data[] = new short[sampleCountPerFrame];
+    double energies[] = new double[frameCount];
+    for (int i = 0; i < frameCount; i++ ){
+      ByteBuffer.wrap(buffer, offset, sampleCountPerFrame * 2)
+          .order(ByteOrder.LITTLE_ENDIAN).asShortBuffer().get(data);
+      double frameEnergy = 0;
+      for (int j = 0; j < sampleCountPerFrame; j++){
+        frameEnergy += data[j] * data[j];
+      }
+      energies[i] = frameEnergy;
+      offset += sampleCountPerFrame * 2;
+    }
+
+    double averageSpeechPower = 0;
+    double averageNoisePower  = 0.00000001;
+    int speechframeCount = 0;
+    int noiseframeCount = 0;
+
+    for (int i = 0; i < labels.size(); i++){
+      if (labels.get(i) == 0){
+        averageNoisePower += energies[i];
+        noiseframeCount++;
+      } else {
+        averageSpeechPower += energies[i];
+        speechframeCount++;
+      }
+    }
+
+    if (noiseframeCount > 0){
+      averageNoisePower /= noiseframeCount;
+    } else {
+      // no noise, pure speech snr = max of 100db
+      return 100;
+    }
+
+    if (speechframeCount > 0) {
+      averageSpeechPower /= speechframeCount;
+    } else {
+      // no speech, pure noise, snr = min  of -100db
+      return -100;
+    }
+
+    return 10 * Math.log10(averageSpeechPower/averageNoisePower);
+  }
+}
+
+
+class UtteranceLabel {
+  private String id; // id is the same as file name
+  private ArrayList<Long> labels;
+  private long labelIndex;
+  private long frameCount;
+
+  public String getId(){
+    return id;
+  }
+
+  public ArrayList<Long> getLabels(){
+    return this.labels;
+  }
+
+  UtteranceLabel(String id, String labelString, String labelDelimiter){
+    // note: no error checking here
+    this.labels = new ArrayList<Long>();
+    this.id = id;
+    final String[] splits = labelString.split(labelDelimiter);
+    if (splits.length < 2){
+      throw new InvalidParameterException("Invalid label line: at least index and length should be provided.");
+    }
+    this.labelIndex = Long.parseLong(splits[0]);
+    this.frameCount = Long.parseLong(splits[1]);
+    if (splits.length != frameCount + 2){
+      throw new InvalidParameterException("Label length mismatches label header meta.");
+    }
+    for (int i = 2; i < splits.length; i++){
+      long label = Long.parseLong(splits[i]);
+      // normalize vector entry to denote voice/non-voice, we only need this for snr computation
+      if (label >= 2057 && label <= 2059){
+        label = 0;
+      } else {
+        label = 1;
+      }
+      labels.add(label);
+    }
+  }
+}

+ 18 - 0
examples/com/aliyun/odps/examples/unstructured/SpeechStorageHandler.java

@@ -0,0 +1,18 @@
+package com.aliyun.odps.examples.unstructured;
+
+import com.aliyun.odps.udf.Extractor;
+import com.aliyun.odps.udf.OdpsStorageHandler;
+import com.aliyun.odps.udf.Outputer;
+
+public class SpeechStorageHandler extends OdpsStorageHandler {
+
+  @Override
+  public Class<? extends Extractor> getExtractorClass() {
+    return SpeechSentenceSnrExtractor.class;
+  }
+
+  @Override
+  public Class<? extends Outputer> getOutputerClass() {
+    throw new UnsupportedOperationException();
+  }
+}

+ 140 - 0
examples/com/aliyun/odps/examples/unstructured/TextExtractor.java

@@ -0,0 +1,140 @@
+package com.aliyun.odps.examples.unstructured;
+
+import com.aliyun.odps.Column;
+import com.aliyun.odps.data.ArrayRecord;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.io.InputStreamSet;
+import com.aliyun.odps.udf.DataAttributes;
+import com.aliyun.odps.udf.ExecutionContext;
+import com.aliyun.odps.udf.Extractor;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+/**
+ * Text extractor that extract schematized records from formatted plain-text(csv, tsv etc.)
+ **/
+public class TextExtractor extends Extractor {
+
+  private InputStreamSet inputs;
+  private String columnDelimiter;
+  private DataAttributes attributes;
+  private BufferedReader currentReader;
+  private boolean firstRead = true;
+
+  public TextExtractor() {
+    // default to ",", this can be overwritten if a specific delimiter is provided (via DataAttributes)
+    this.columnDelimiter = ",";
+  }
+
+  // no particular usage for execution context in this example
+  @Override
+  public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes) {
+    this.inputs = inputs;
+    this.attributes = attributes;
+    // check if "delimiter" attribute is supplied via SQL query
+    String columnDelimiter = this.attributes.getValueByKey("delimiter");
+    if ( columnDelimiter != null)
+    {
+      this.columnDelimiter = columnDelimiter;
+    }
+    System.out.println("TextExtractor using delimiter [" + this.columnDelimiter + "].");
+    // note: more properties can be inited from attributes if needed
+  }
+
+  @Override
+  public Record extract() throws IOException {
+    String line = readNextLine();
+    if (line == null) {
+      return null;
+    }
+    return textLineToRecord(line);
+  }
+
+  @Override
+  public void close(){
+    // no-op
+  }
+
+  private Record textLineToRecord(String line) throws IllegalArgumentException
+  {
+    Column[] outputColumns = this.attributes.getRecordColumns();
+    ArrayRecord record = new ArrayRecord(outputColumns);
+    if (this.attributes.getRecordColumns().length != 0){
+      // string copies are needed, not the most efficient one, but suffice as an example here
+      String[] parts = line.split(columnDelimiter);
+      int[] outputIndexes = this.attributes.getNeededIndexes();
+      if (outputIndexes == null){
+        throw new IllegalArgumentException("No outputIndexes supplied.");
+      }
+      if (outputIndexes.length != outputColumns.length){
+        throw new IllegalArgumentException("Mismatched output schema: Expecting "
+            + outputColumns.length + " columns but get " + parts.length);
+      }
+      int index = 0;
+      for(int i = 0; i < parts.length; i++){
+        // only parse data in columns indexed by output indexes
+        if (index < outputIndexes.length && i == outputIndexes[index]){
+          switch (outputColumns[index].getType()) {
+            case STRING:
+              record.setString(index, parts[i]);
+              break;
+            case BIGINT:
+              record.setBigint(index, Long.parseLong(parts[i]));
+              break;
+            case BOOLEAN:
+              record.setBoolean(index, Boolean.parseBoolean(parts[i]));
+              break;
+            case DOUBLE:
+              record.setDouble(index, Double.parseDouble(parts[i]));
+              break;
+            case DATETIME:
+            case DECIMAL:
+            case ARRAY:
+            case MAP:
+            default:
+              throw new IllegalArgumentException("Type " + outputColumns[index].getType() + " not supported for now.");
+          }
+          index++;
+        }
+      }
+    }
+    return record;
+  }
+
+  /**
+   * Read next line from underlying input streams.
+   * @return The next line as String object. If all of the contents of input
+   * streams has been read, return null.
+   */
+  private String readNextLine() throws IOException {
+    if (firstRead) {
+      firstRead = false;
+      // the first read, initialize things
+      currentReader = moveToNextStream();
+      if (currentReader == null) {
+        // empty input stream set
+        return null;
+      }
+    }
+    while (currentReader != null) {
+      String line = currentReader.readLine();
+      if (line != null) {
+        return line;
+      }
+      currentReader = moveToNextStream();
+    }
+    return null;
+  }
+
+  private BufferedReader moveToNextStream() throws IOException {
+    InputStream stream = inputs.next();
+    if (stream == null) {
+      return null;
+    } else {
+      return new BufferedReader(new InputStreamReader(stream));
+    }
+  }
+}

+ 56 - 0
examples/com/aliyun/odps/examples/unstructured/TextOutputer.java

@@ -0,0 +1,56 @@
+package com.aliyun.odps.examples.unstructured;
+
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.io.OutputStreamSet;
+import com.aliyun.odps.io.SinkOutputStream;
+import com.aliyun.odps.udf.DataAttributes;
+import com.aliyun.odps.udf.ExecutionContext;
+import com.aliyun.odps.udf.Outputer;
+
+import java.io.IOException;
+
+public class TextOutputer extends Outputer {
+  private SinkOutputStream outputStream;
+  private DataAttributes attributes;
+  private String delimiter;
+
+  public TextOutputer (){
+    // default delimiter, this can be overwritten if a delimiter is provided through the attributes.
+    this.delimiter = "|";
+  }
+
+  @Override
+  public void output(Record record) throws IOException {
+    this.outputStream.write(recordToString(record).getBytes());
+  }
+
+  // no particular usage of execution context in this example
+  @Override
+  public void setup(ExecutionContext ctx, OutputStreamSet outputStreamSet, DataAttributes attributes) throws IOException {
+    this.outputStream = outputStreamSet.next();
+    this.attributes = attributes;
+  }
+
+  @Override
+  public void close() {
+    // no-op
+  }
+
+  private String recordToString(Record record){
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < record.getColumnCount(); i++)
+    {
+      if (null == record.get(i)){
+        sb.append("NULL");
+      }
+      else{
+        sb.append(record.get(i).toString());
+      }
+      if (i != record.getColumnCount() - 1){
+        sb.append(this.delimiter);
+      }
+    }
+    sb.append("\n");
+    return sb.toString();
+  }
+}

+ 18 - 0
examples/com/aliyun/odps/examples/unstructured/TextStorageHandler.java

@@ -0,0 +1,18 @@
+package com.aliyun.odps.examples.unstructured;
+
+import com.aliyun.odps.udf.Extractor;
+import com.aliyun.odps.udf.OdpsStorageHandler;
+import com.aliyun.odps.udf.Outputer;
+
+public class TextStorageHandler extends OdpsStorageHandler {
+
+  @Override
+  public Class<? extends Extractor> getExtractorClass() {
+    return TextExtractor.class;
+  }
+
+  @Override
+  public Class<? extends Outputer> getOutputerClass() {
+    return TextOutputer.class;
+  }
+}

+ 111 - 0
examples/com/aliyun/odps/examples/unstructured/test/ExtractorTest.java

@@ -0,0 +1,111 @@
+package com.aliyun.odps.examples.unstructured.test;
+
+import com.aliyun.odps.Column;
+import com.aliyun.odps.data.ArrayRecord;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.examples.TestUtil;
+import com.aliyun.odps.examples.unstructured.SpeechSentenceSnrExtractor;
+import com.aliyun.odps.examples.unstructured.TextExtractor;
+import com.aliyun.odps.udf.local.runner.ExtractorRunner;
+import com.aliyun.odps.udf.local.util.LocalDataAttributes;
+import com.aliyun.odps.udf.local.util.UnstructuredUtils;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ExtractorTest {
+  private String ambulanceFullSchema =
+      "vehicle:bigint;id:bigint;patient:bigint;calls:bigint;latitude:double;longitude:double;time:string;direction:string";
+  private String speechDataFullSchema = "sentence_snr:double;id:string";
+
+  @BeforeClass
+  public static void initWarehouse() {
+    TestUtil.initWarehouse();
+  }
+
+  @Test
+  public void testTextExtractor() throws Exception {
+    /**
+     * Equivalent to the following SQL:
+     CREATE EXTERNAL TABLE  ambulance_data_external
+     ( vehicle bigint, id bigint, patient bigint, calls bigint,
+     Latitude double, Longitude double, time string, direction string)
+     STORED BY 'com.aliyun.odps.udf.example.text.TextStorageHandler'
+     LOCATION 'oss://.../data/ambulance_csv/'
+     USING 'jar_file_name.jar';
+
+     SELECT * FROM ambulance_data_external;
+     */
+    Column[] externalTableSchema = UnstructuredUtils.parseSchemaString(ambulanceFullSchema);
+    // note: default delimiter used in TextExtractor is ','
+    LocalDataAttributes attributes = new LocalDataAttributes(null, externalTableSchema);
+    ExtractorRunner runner = new ExtractorRunner(TestUtil.getOdps(), new TextExtractor(), attributes);
+    //using local file directory to mock data source
+    runner.feedDirectory(TestUtil.class.getResource("/data/ambulance_csv/").getPath());
+    List<Record> records = runner.yieldRecords();
+    // do verification below
+    Assert.assertEquals(records.size(), 15);
+    ArrayRecord record0 = new ArrayRecord(externalTableSchema);
+    record0.set(0, (long)1);
+    record0.set(1, (long)1);
+    record0.set(2, (long)51);
+    record0.set(3, (long)1);
+    record0.set(4, 46.81006);
+    record0.set(5, -92.08174);
+    record0.set(6, "9/14/2014 0:00");
+    record0.set(7, "S");
+    Assert.assertTrue(UnstructuredUtils.recordsEqual(record0, records.get(0)));
+  }
+
+  @Test
+  public void testSpeechExtraction() throws Exception {
+    /**
+     * Equivalent to the following SQL:
+     CREATE EXTERNAL TABLE speech_snr_external
+     (sentence_snr double, id string)
+     STORED BY 'com.aliyun.odps.udf.example.speech.SpeechStorageHandler'
+     WITH SERDEPROPERTIES ('mlfFileName'='speech_model_random_5_utterance' , 'speechSampleRateInKHz' = '16')
+     LOCATION 'oss://.../data/speech_wav/'
+     USING 'jar_file_name.jar';
+
+     SELECT * FROM speech_snr_external;
+     */
+    Column[] externalTableSchema = UnstructuredUtils.parseSchemaString(speechDataFullSchema);
+    Map<String, String> userProperties = new HashMap<String, String>();
+    // a file resource
+    userProperties.put("mlfFileName", "speech_model_random_5_utterance");
+    // an extractor parameter
+    userProperties.put("speechSampleRateInKHz", "16");
+    LocalDataAttributes attributes = new LocalDataAttributes(userProperties, externalTableSchema);
+    // SpeechSentenceSnrExtractor will analyze a speech wav file and output
+    // 1. the average sentence snr of a wav file
+    // 2. the corresponding wav file name
+    ExtractorRunner runner = new ExtractorRunner(TestUtil.getOdps(), new SpeechSentenceSnrExtractor(), attributes);
+
+    runner.feedDirectory(TestUtil.class.getResource("/data/speech_wav/").getPath());
+    List<Record> records = runner.yieldRecords();
+
+    // do verification below
+    Assert.assertEquals(records.size(), 3);
+
+    ArrayRecord record0 = new ArrayRecord(externalTableSchema);
+    record0.set(0, 31.39050062838079);
+    record0.set(1, "tsh148_seg_2_3013_3_6_48_80bd359827e24dd7_0");
+    Assert.assertTrue(UnstructuredUtils.recordsEqual(record0, records.get(0)));
+
+    ArrayRecord record1 = new ArrayRecord(externalTableSchema);
+    record1.set(0, 35.477360745366035);
+    record1.set(1, "tsh148_seg_3013_1_31_11_9d7c87aef9f3e559_0");
+    Assert.assertTrue(UnstructuredUtils.recordsEqual(record1, records.get(1)));
+
+    ArrayRecord record2 = new ArrayRecord(externalTableSchema);
+    record2.set(0, 16.046150955268665);
+    record2.set(1, "tsh148_seg_3013_2_29_49_f4cb0990a6b4060c_0");
+    Assert.assertTrue(UnstructuredUtils.recordsEqual(record2, records.get(2)));
+  }
+
+}

+ 133 - 0
examples/com/aliyun/odps/examples/unstructured/test/OutputerTest.java

@@ -0,0 +1,133 @@
+package com.aliyun.odps.examples.unstructured.test;
+
+import com.aliyun.odps.Column;
+import com.aliyun.odps.data.ArrayRecord;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.examples.TestUtil;
+import com.aliyun.odps.udf.example.text.TextOutputer;
+import com.aliyun.odps.udf.local.runner.OutputerRunner;
+import com.aliyun.odps.udf.local.util.LocalDataAttributes;
+import com.aliyun.odps.udf.local.util.UnstructuredUtils;
+import com.aliyun.odps.utils.StringUtils;
+import org.junit.*;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class OutputerTest {
+
+  private String simpleTableSchema = "a:bigint;b:double;c:string";
+  private String adsLogTableSchema = "AdId:BIGINT;Rand:DOUBLE;AdvertiserName:STRING;Comment:STRING";
+  private File outputDirectory = null;
+
+  @BeforeClass
+  public static void initWarehouse() {
+    TestUtil.initWarehouse();
+  }
+
+  @Before
+  public void before() throws IOException{
+    // output directory preparation
+    outputDirectory = new File("temp/" + UnstructuredUtils.generateOutputName());
+    outputDirectory.delete();
+    outputDirectory.mkdirs();
+  }
+
+  @Test
+  public void testOutputSimpleText() throws Exception {
+    /**
+     * Test outputting manually constructed records to text
+     */
+    Column[] externalTableSchema = UnstructuredUtils.parseSchemaString(simpleTableSchema);
+    LocalDataAttributes attributes = new LocalDataAttributes(null, externalTableSchema);
+    // TextOutputer will output one single file
+    OutputerRunner runner = new OutputerRunner(TestUtil.getOdps(), new TextOutputer(), attributes);
+    List<Record> records = new ArrayList<Record>();
+    records.add(new ArrayRecord(externalTableSchema, new Object[]{(long)1, 2.5, "row0"}));
+    records.add(new ArrayRecord(externalTableSchema, new Object[]{(long)1234567, 8.88, "row1"}));
+    records.add(new ArrayRecord(externalTableSchema, new Object[]{(long)12, 123.1, "testrow"}));
+    // run outputer
+    runner.feedRecords(records);
+    runner.yieldTo(outputDirectory.getAbsolutePath());
+
+    String expcetedOutput = "1|2.5|row0\n" +
+        "1234567|8.88|row1\n" +
+        "12|123.1|testrow\n";
+
+    verifySingleFileOutput(expcetedOutput);
+  }
+
+  @Test
+  public void testOutputSpecialText() throws Exception {
+    /**
+     * Test reading from internal table and outputting to text file, with a user defined delimiter.
+     * Equivalent to the following SQL:
+     *
+     CREATE EXTERNAL TABLE ads_log_external
+     (AdId bigint, Rand double,
+     AdvertiserName string, Comment string)
+     STORED BY 'com.aliyun.odps.udf.example.text.TextStorageHandler'
+     WITH SERDEPROPERTIES ('delimiter'='\t')
+     LOCATION 'oss://path/to/output/'
+     USING 'jar_file_name.jar';;
+
+     INSERT OVERWRITE ads_log_external SELECT * FROM ads_log;
+     * Here ads_log is an internal table (locally defined in warehouse directory)
+     */
+    Column[] externalTableSchema = UnstructuredUtils.parseSchemaString(adsLogTableSchema);
+    Map<String, String> userProperties = new HashMap<String, String>();
+    userProperties.put("delimiter", "\t");
+    LocalDataAttributes attributes = new LocalDataAttributes(userProperties, externalTableSchema);
+    // TextOutputer outputs one single file
+    OutputerRunner runner = new OutputerRunner(TestUtil.getOdps(), new TextOutputer(), attributes);
+    String internalTableName = "ads_log";
+    // We are doing SELECT * FROM here, so the two tables have the same schema
+    Column[] internalTableSceham = externalTableSchema;
+
+    List<Record> records = new ArrayList<Record>();
+    Record record;
+    while ((record = UnstructuredUtils.readFromInternalTable("example_project", internalTableName,
+        internalTableSceham, null)) != null){
+      records.add(record.clone());
+    }
+    // run outputer
+    runner.feedRecords(records);
+    runner.yieldTo(outputDirectory.getAbsolutePath());
+
+    String expcetedOutput = "399266\t0.5\tDoritos\twhat is up\n" +
+        "399266\t0.0\tTacobell\thello!\n" +
+        "382045\t-76.0\tVoelkl\trandom comments\n" +
+        "382045\t6.4\tWhistler Resort\ta\n" +
+        "106479\t98.7\tAmazon Prime\tbdcd\n" +
+        "906441\t-9865788.2\tHayden Planetarium\tplatium\n" +
+        "351530\t0.005\tMicrosoft Azure Services\ttst\n";
+
+    verifySingleFileOutput(expcetedOutput);
+  }
+
+  private void verifySingleFileOutput(String expectedOutput) throws IOException {
+    verifyFilesOutput(new String[]{expectedOutput});
+  }
+
+  private void verifyFilesOutput(String[] expectedOutputs) throws IOException {
+    File[] outputs = outputDirectory.listFiles();
+    Assert.assertEquals(outputs.length, expectedOutputs.length);
+    for (int i = 0; i < outputs.length; i++){
+      File outputFile = outputs[i];
+      FileInputStream fis = new FileInputStream(outputFile);
+      byte[] data = new byte[(int)outputFile.length()];
+      fis.read(data);
+      String content = new String(data);
+      String[] rows = StringUtils.split(content, '\n');
+      String[] expectedRows = StringUtils.split(expectedOutputs[i], '\n');
+      // due to double presentation accuracy difference, the output may not exactly match expected,
+      // therefore we only verify that numbers of rows match.
+      Assert.assertEquals(rows.length, expectedRows.length);
+    }
+  }
+}

+ 6 - 0
examples/data/ambulance_csv/1.csv

@@ -0,0 +1,6 @@
+1,1,51,1,46.81006,-92.08174,9/14/2014 0:00,S
+1,2,13,1,46.81006,-92.08174,9/14/2014 0:00,NE
+1,3,48,1,46.81006,-92.08174,9/14/2014 0:00,NE
+1,4,30,1,46.81006,-92.08174,9/14/2014 0:00,W
+1,5,47,1,46.81006,-92.08174,9/14/2014 0:00,S
+1,6,9,1,46.81006,-92.08174,9/14/2014 0:00,S

+ 9 - 0
examples/data/ambulance_csv/2.csv

@@ -0,0 +1,9 @@
+1,1,40,1,46.81006,-92.08174,9/15/2014 0:00,NE
+1,2,33,1,46.81006,-92.08174,9/15/2014 0:00,NE
+1,3,60,1,46.81006,-92.08174,9/15/2014 0:00,NW
+1,4,50,1,46.81006,-92.08174,9/15/2014 0:00,SW
+1,5,50,1,46.81006,-92.08174,9/15/2014 0:00,S
+1,6,53,1,46.81006,-92.08174,9/15/2014 0:00,NE
+1,7,60,1,46.81006,-92.08174,9/15/2014 0:00,NE
+1,8,75,1,46.81006,-92.08174,9/15/2014 0:00,E
+1,9,75,1,46.81006,-92.08174,9/15/2014 0:00,E

BIN
examples/data/speech_wav/tsh148_seg_2_3013_3_6_48_80bd359827e24dd7_0.wav


BIN
examples/data/speech_wav/tsh148_seg_3013_1_31_11_9d7c87aef9f3e559_0.wav


BIN
examples/data/speech_wav/tsh148_seg_3013_2_29_49_f4cb0990a6b4060c_0.wav


+ 59 - 0
pom.xml

@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>com.aliyun.odps.myJava</groupId>
+    <artifactId>elab_udf</artifactId>
+    <version>1.0-SNAPSHOT</version>
+    <dependencies>
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-sdk-core</artifactId>
+            <version>${sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-sdk-udf</artifactId>
+            <version>${sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-udf-local</artifactId>
+            <version>${sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-sdk-mapred</artifactId>
+            <version>${sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-mapred-local</artifactId>
+            <version>${sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-sdk-graph</artifactId>
+            <version>${sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-graph-local</artifactId>
+            <version>${sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+        </dependency>
+    </dependencies>
+    <properties>
+        <sdk.version>0.35.5-public</sdk.version>
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+    </properties>
+
+
+</project>

+ 287 - 0
src/main/java/com/alibaba/dataworks/udtf/AdviserScoreUDTF.java

@@ -0,0 +1,287 @@
+package com.alibaba.dataworks.udtf;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import com.aliyun.odps.udf.ExecutionContext;
+import com.aliyun.odps.udf.UDFException;
+import com.aliyun.odps.udf.UDTF;
+import com.aliyun.odps.udf.annotation.Resolve;
+import com.aliyun.odps.utils.StringUtils;
+// TODO define input and output types, e.g. "string,string->string,bigint".
+@Resolve({"string, string, string, string, string, string -> string, string, string, string, string, string, string, string, string, string, string,string"})
+public class AdviserScoreUDTF extends UDTF {
+
+    @Override
+    public void setup(ExecutionContext ctx) throws UDFException {
+
+    }
+
+    @Override
+    public void process(Object[] args) throws UDFException {
+        Adviser adviser = new Adviser(args);
+        //发送im消息总数
+        Integer Indicators1 = 0;
+        //视频看房平均通话时长
+        Double Indicators2 = 0.0;
+        //视频看房平均响应时长
+        Double Indicators3 = 0.0;
+        //电话平均通话时长
+        Double Indicators4 = 0.0;
+        // 点击抢单总数
+        Integer Indicators5 = 0;
+        // 平均把控分
+        Double Indicators6 = 0.0;
+        String score = "";
+        int index = 0;
+        for(String idValue: adviser.getIdValuesList()) {
+            if("response_time".equals(idValue)) {
+                // 顾问的平均响应时长
+                String[] dataValuesList = adviser.getValueDatasList();
+                if (dataValuesList.length > 0 && index < dataValuesList.length) {
+                    Indicators3 = Double.parseDouble(dataValuesList[index]);
+                }
+
+            } else if ("clk_2b_25".equals(idValue) || "clk_2cmina_27".equals(idValue)) {
+                // 发送im消息总数
+                String [] dataValuesList = adviser.getCountDatasList();
+                if (dataValuesList.length > 0 && index < dataValuesList.length) {
+                    String[] imCount = dataValuesList[index].split(",");
+                    for (String count: imCount) {
+                        Indicators1 += Integer.parseInt(count);
+                    }
+                }
+            } else if ("im_call".equals(idValue)) {
+                // 视频看房平均通话时长
+                String[] videoTimeList = adviser.getValueDatasList();
+                if(videoTimeList.length > 0 && index < videoTimeList.length) {
+                    String[] videoList = videoTimeList[index].split(",");
+                    for(String time: videoList) {
+                        Indicators2 += diffTime(time);
+                    }
+
+                    if(Indicators2 > 0) {
+                        Indicators2 = Indicators2 / videoList.length;
+                    }
+                }
+
+            } else if ("adviser_score".equals(idValue)) {
+                // 平均把控分 Indicators6
+                String[] adviserScores = adviser.getValueDatasList();
+                if(adviserScores.length > 0 && index < adviserScores.length) {
+                    String[] adviserscoreList = adviserScores[index].split(",");
+                    for(String adviserScore: adviserscoreList) {
+                        Indicators6 += Double.parseDouble(adviserScore);
+                    }
+
+                    if (Indicators6 > 0 && adviserscoreList.length > 0) {
+                        Indicators6 = Indicators6 / adviserscoreList.length;
+                    }
+                }
+            }
+            index++;
+        }
+        score = score(Indicators1, Indicators2, Indicators3, Indicators4, Indicators5, Indicators6);
+        forward(adviser.getBrandId(), adviser.getHouseId(), adviser.getUserId(), Indicators1 + "", Indicators2 + "", Indicators3 + "", Indicators4 + "", Indicators5 + "", Indicators6 + "", score, "", date());
+
+    }
+
+    private String score(Integer one, Double two, Double three, Double four, Integer five, Double six) {
+        Integer score = imInfoScore(one) + videoSeeHouseCallTime(two) + videoSeeHouseResponseTime(three) + phoneWorkTimeAVG(four) + grabASingle(five) + avgControlScore(six);
+
+        return score*100/120 + "";
+    }
+
+
+    private String date() {
+        Date date = new Date();
+        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
+        return format.format(date);
+    }
+
+
+    @Override
+    public void close() throws UDFException {
+
+    }
+
+    private Integer avgControlScore(Double six) {
+        if (six >= 80 && six <= 100) {
+            return 20;
+        } else if (six >= 60 && six < 79) {
+            return 10;
+        } else if (six > 0 && six < 59) {
+            return 5;
+        }
+        return 0;
+    }
+
+    private Integer grabASingle(Integer five) {
+        return 0;
+    }
+
+    private Integer phoneWorkTimeAVG(Double four) {
+
+        return 0;
+    }
+
+    private Integer videoSeeHouseResponseTime(Double three) {
+        if (three > 0 && three < 30) {
+            return 20;
+        } else if (three >= 30 && three <= 60) {
+            return 10;
+        } else if (three > 60) {
+            return 5;
+        }
+        return 0;
+    }
+
+    private Integer videoSeeHouseCallTime(Double two) {
+        if (two > 5*60) {
+            return 20;
+        } else if (two <= 5*60 && two > 3*60) {
+            return 10;
+        } else if (two > 1*60 && two <= 3*60) {
+            return 5;
+        }
+
+        return 0;
+    }
+
+    private Integer imInfoScore(Integer one) {
+        if (one > 8) {
+            return 20;
+        } else if (one > 5 && one <= 8) {
+            return 10;
+        } else if (one > 0 && one <= 5) {
+            return 5;
+        }
+        return 0;
+    }
+
+
+    private long diffTime(String endTime) {
+        try {
+            DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+            Date startDate = df.parse("1970-01-01 00:00:00");
+            Date endDate = df.parse(endTime);
+            long diff = endDate.getTime() - startDate.getTime();
+            if (diff > 0) {
+                return diff / 1000;
+            } else {
+                return 0L;
+            }
+        } catch(Exception e) {
+            return 0L;
+        }
+    }
+
+    private String getValue(Object[] args, int index) {
+        return (String)args[index];
+    }
+
+    public static class Adviser {
+        String brandId;
+        String houseId;
+        String userId;
+        String idValues;
+        String countDatas;
+        String valueDatas;
+        String[] idValuesList;
+        String[] countDatasList;
+        String[] valueDatasList;
+
+
+        public Adviser(Object[] args) {
+            this.brandId = getValue(args, 0);
+            this.houseId =  getValue(args, 1);
+            this.userId = getValue(args, 2);
+            this.idValues = getValue(args, 3);
+            this.countDatas = getValue(args, 4);
+            this.valueDatas = getValue(args, 5);
+            this.idValuesList = this.idValues.split("~");
+            this.countDatasList = this.countDatas.split("~");
+            if(!StringUtils.isNullOrEmpty(this.valueDatas)) {
+                this.valueDatasList = this.valueDatas.split("~");
+            } else {
+                this.valueDatasList = new String[0];
+            }
+        }
+
+        public String getBrandId() {
+            return brandId;
+        }
+
+        public void setBrandId(String brandId) {
+            this.brandId = brandId;
+        }
+
+        public String getHouseId() {
+            return houseId;
+        }
+
+        public void setHouseId(String houseId) {
+            this.houseId = houseId;
+        }
+
+        public String getUserId() {
+            return userId;
+        }
+
+        public void setUserId(String userId) {
+            this.userId = userId;
+        }
+
+        public String getIdValues() {
+            return idValues;
+        }
+
+        public void setIdValues(String idValues) {
+            this.idValues = idValues;
+        }
+
+        public String getCountDatas() {
+            return countDatas;
+        }
+
+        public void setCountDatas(String countDatas) {
+            this.countDatas = countDatas;
+        }
+
+        public String getValueDatas() {
+            return valueDatas;
+        }
+
+        public void setValueDatas(String valueDatas) {
+            this.valueDatas = valueDatas;
+        }
+
+        public String[] getIdValuesList() {
+            return idValuesList;
+        }
+
+        public void setIdValuesList(String[] idValuesList) {
+            this.idValuesList = idValuesList;
+        }
+
+        public String[] getCountDatasList() {
+            return countDatasList;
+        }
+
+        public void setCountDatasList(String[] countDatasList) {
+            this.countDatasList = countDatasList;
+        }
+
+        public String[] getValueDatasList() {
+            return valueDatasList;
+        }
+
+        public void setValueDatasList(String[] valueDatasList) {
+            this.valueDatasList = valueDatasList;
+        }
+
+        private String getValue(Object[] args, int index) {
+            return (String)args[index];
+        }
+    }
+
+}

+ 44 - 0
src/test/java/Test.java

@@ -0,0 +1,44 @@
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+public class Test {
+    public static String dayForWeek(String pTime) throws Throwable {
+
+        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
+
+        Date tmpDate = format.parse(pTime);
+
+        Calendar cal = Calendar.getInstance();
+
+        String[] weekDays = { "7", "1", "2", "3", "4", "5", "6" };
+
+        try {
+
+            cal.setTime(tmpDate);
+
+        } catch (Exception e) {
+
+            e.printStackTrace();
+
+        }
+
+        int w = cal.get(Calendar.DAY_OF_WEEK) - 1; // 指示一个星期中的某天。
+
+        if (w < 0)
+
+            w = 0;
+
+        return weekDays[w];
+
+    }
+
+    public static void main(String[] args) throws Throwable {
+
+       String[] s = "北京市,".split(",");
+       System.out.println(s.length);
+
+    }
+}

BIN
target/classes/com/alibaba/dataworks/udtf/AdviserScoreUDTF$Adviser.class


BIN
target/classes/com/alibaba/dataworks/udtf/AdviserScoreUDTF.class


BIN
target/test-classes/Test.class


BIN
target/test-classes/com/aliyun/odps/examples/TestUtil.class


BIN
target/test-classes/com/aliyun/odps/examples/graph/Kmeans$KmeansAggrValue.class


BIN
target/test-classes/com/aliyun/odps/examples/graph/Kmeans$KmeansAggregator.class


BIN
target/test-classes/com/aliyun/odps/examples/graph/Kmeans$KmeansVertex.class


BIN
target/test-classes/com/aliyun/odps/examples/graph/Kmeans$KmeansVertexReader.class


BIN
target/test-classes/com/aliyun/odps/examples/graph/Kmeans.class


BIN
target/test-classes/com/aliyun/odps/examples/graph/PageRank$PageRankVertex.class


BIN
target/test-classes/com/aliyun/odps/examples/graph/PageRank$PageRankVertexReader.class


BIN
target/test-classes/com/aliyun/odps/examples/graph/PageRank.class


BIN
target/test-classes/com/aliyun/odps/examples/graph/SSSP$MinLongCombiner.class


BIN
target/test-classes/com/aliyun/odps/examples/graph/SSSP$SSSPVertex.class


BIN
target/test-classes/com/aliyun/odps/examples/graph/SSSP$SSSPVertexReader.class


BIN
target/test-classes/com/aliyun/odps/examples/graph/SSSP.class


BIN
target/test-classes/com/aliyun/odps/examples/mr/Resource$TokenizerMapper.class


BIN
target/test-classes/com/aliyun/odps/examples/mr/Resource.class


BIN
target/test-classes/com/aliyun/odps/examples/mr/WordCount$SumCombiner.class


BIN
target/test-classes/com/aliyun/odps/examples/mr/WordCount$SumReducer.class


BIN
target/test-classes/com/aliyun/odps/examples/mr/WordCount$TokenizerMapper.class


BIN
target/test-classes/com/aliyun/odps/examples/mr/WordCount.class


BIN
target/test-classes/com/aliyun/odps/examples/mr/test/WordCountTest.class


BIN
target/test-classes/com/aliyun/odps/examples/udf/UDAFExample.class


BIN
target/test-classes/com/aliyun/odps/examples/udf/UDAFResource.class


BIN
target/test-classes/com/aliyun/odps/examples/udf/UDFExample.class


BIN
target/test-classes/com/aliyun/odps/examples/udf/UDFResource.class


BIN
target/test-classes/com/aliyun/odps/examples/udf/UDTFExample.class


BIN
target/test-classes/com/aliyun/odps/examples/udf/UDTFResource.class


BIN
target/test-classes/com/aliyun/odps/examples/udf/test/UDAFTest.class


BIN
target/test-classes/com/aliyun/odps/examples/udf/test/UDFTest.class


BIN
target/test-classes/com/aliyun/odps/examples/udf/test/UDTFTest.class


BIN
target/test-classes/com/aliyun/odps/examples/udj/PayUserLogMergeJoin.class


BIN
target/test-classes/com/aliyun/odps/examples/unstructured/SpeechSentenceSnrExtractor.class


BIN
target/test-classes/com/aliyun/odps/examples/unstructured/SpeechStorageHandler.class


BIN
target/test-classes/com/aliyun/odps/examples/unstructured/TextExtractor$1.class


BIN
target/test-classes/com/aliyun/odps/examples/unstructured/TextExtractor.class


BIN
target/test-classes/com/aliyun/odps/examples/unstructured/TextOutputer.class


BIN
target/test-classes/com/aliyun/odps/examples/unstructured/TextStorageHandler.class


BIN
target/test-classes/com/aliyun/odps/examples/unstructured/UtteranceLabel.class


BIN
target/test-classes/com/aliyun/odps/examples/unstructured/test/ExtractorTest.class


BIN
target/test-classes/com/aliyun/odps/examples/unstructured/test/OutputerTest.class


+ 6 - 0
target/test-classes/data/ambulance_csv/1.csv

@@ -0,0 +1,6 @@
+1,1,51,1,46.81006,-92.08174,9/14/2014 0:00,S
+1,2,13,1,46.81006,-92.08174,9/14/2014 0:00,NE
+1,3,48,1,46.81006,-92.08174,9/14/2014 0:00,NE
+1,4,30,1,46.81006,-92.08174,9/14/2014 0:00,W
+1,5,47,1,46.81006,-92.08174,9/14/2014 0:00,S
+1,6,9,1,46.81006,-92.08174,9/14/2014 0:00,S

+ 9 - 0
target/test-classes/data/ambulance_csv/2.csv

@@ -0,0 +1,9 @@
+1,1,40,1,46.81006,-92.08174,9/15/2014 0:00,NE
+1,2,33,1,46.81006,-92.08174,9/15/2014 0:00,NE
+1,3,60,1,46.81006,-92.08174,9/15/2014 0:00,NW
+1,4,50,1,46.81006,-92.08174,9/15/2014 0:00,SW
+1,5,50,1,46.81006,-92.08174,9/15/2014 0:00,S
+1,6,53,1,46.81006,-92.08174,9/15/2014 0:00,NE
+1,7,60,1,46.81006,-92.08174,9/15/2014 0:00,NE
+1,8,75,1,46.81006,-92.08174,9/15/2014 0:00,E
+1,9,75,1,46.81006,-92.08174,9/15/2014 0:00,E

BIN
target/test-classes/data/speech_wav/tsh148_seg_2_3013_3_6_48_80bd359827e24dd7_0.wav


BIN
target/test-classes/data/speech_wav/tsh148_seg_3013_1_31_11_9d7c87aef9f3e559_0.wav


BIN
target/test-classes/data/speech_wav/tsh148_seg_3013_2_29_49_f4cb0990a6b4060c_0.wav


+ 3 - 0
temp/mr_20200824202014_624_15228/input/example_project/wc_in1/__schema__

@@ -0,0 +1,3 @@
+project=example_project
+table=wc_in1
+columns=col1:STRING,col2:STRING,col3:STRING,col4:STRING

+ 4 - 0
temp/mr_20200824202014_624_15228/input/example_project/wc_in1/data

@@ -0,0 +1,4 @@
+A2,A3
+A2,A3
+A2,A3
+A2,A3

+ 5 - 0
temp/mr_20200824202014_624_15228/input/example_project/wc_in2/__schema__

@@ -0,0 +1,5 @@
+project=example_project
+table=wc_in2
+columns=cola:STRING,colb:STRING,colc:STRING
+partitions=p1:STRING,p2:STRING
+

+ 3 - 0
temp/mr_20200824202014_624_15228/input/example_project/wc_in2/p1=2/p2=1/data

@@ -0,0 +1,3 @@
+three1,three2,three3
+three1,three2,three3
+three1,three2,three3

+ 15 - 0
temp/mr_20200824202014_624_15228/job.xml

@@ -0,0 +1,15 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>odps.mapred.reduce.class</name><value>com.aliyun.odps.examples.mr.WordCount$SumReducer</value></property>
+<property><name>odps.mapred.local.download.mode</name><value>AUTO</value></property>
+<property><name>odps.mapred.local.record.download.limit</name><value>100</value></property>
+<property><name>odps.mapred.map.class</name><value>com.aliyun.odps.examples.mr.WordCount$TokenizerMapper</value></property>
+<property><name>odps.mapred.output.desc</name><value>[{"tblName":"wc_out","partSpec":{},"label":"__default__"}]</value></property>
+<property><name>odps.mapred.local.input.column.seperator</name><value>,</value></property>
+<property><name>odps.mapred.resources</name><value/></property>
+<property><name>odps.mapred.mapoutput.key.schema</name><value>word:STRING</value></property>
+<property><name>odps.mapred.combine.class</name><value>com.aliyun.odps.examples.mr.WordCount$SumCombiner</value></property>
+<property><name>odps.mapred.job.name</name><value>mr_20200824202014_624_15228</value></property>
+<property><name>odps.mapred.mapoutput.value.schema</name><value>count:BIGINT</value></property>
+<property><name>odps.mapred.local.temp.retain</name><value>true</value></property>
+<property><name>odps.mapred.input.desc</name><value>[{"tblName":"wc_in1","partSpec":{},"cols":["col2","col3"],"label":"__default__"},{"tblName":"wc_in2","partSpec":{"p1":"2","p2":"1"},"label":"__default__"}]</value></property>
+</configuration>

+ 5 - 0
temp/mr_20200824202014_624_15228/output/__default__/R_000000

@@ -0,0 +1,5 @@
+A2,4
+A3,4
+three1,3
+three2,3
+three3,3

+ 3 - 0
temp/mr_20200824202014_624_15228/output/__default__/__schema__

@@ -0,0 +1,3 @@
+project=example_project
+table=wc_out
+columns=word:STRING,cnt:BIGINT

+ 3 - 0
warehouse/example_project/__resources__/file_resource.txt

@@ -0,0 +1,3 @@
+line1
+line2
+line3

+ 3 - 0
warehouse/example_project/__resources__/kmeans_centers

@@ -0,0 +1,3 @@
+1,0
+1,1
+0,1

File diff suppressed because it is too large
+ 10 - 0
warehouse/example_project/__resources__/speech_model_random_5_utterance


+ 1 - 0
warehouse/example_project/__resources__/table_resource1/__ref__

@@ -0,0 +1 @@
+example_project.wc_in1

+ 1 - 0
warehouse/example_project/__resources__/table_resource2/__ref__

@@ -0,0 +1 @@
+example_project.wc_in2(p1=2,p2=2)

+ 3 - 0
warehouse/example_project/__tables__/ads_log/__schema__

@@ -0,0 +1,3 @@
+project=example_project
+table=ads_log
+columns=AdId:BIGINT,Rand:DOUBLE,AdvertiserName:STRING,Comment:STRING

+ 7 - 0
warehouse/example_project/__tables__/ads_log/data

@@ -0,0 +1,7 @@
+399266,0.5,Doritos,what is up
+399266,0,Tacobell,hello!
+382045,-76,Voelkl,random comments
+382045,6.4,Whistler Resort,a
+106479,98.7,Amazon Prime,bdcd
+906441,-9865788.2,Hayden Planetarium,platium
+351530,0.005,Microsoft Azure Services,tst

+ 3 - 0
warehouse/example_project/__tables__/kmeans_in/__schema__

@@ -0,0 +1,3 @@
+project=example_project
+table=kmeans_in
+columns=dim1:DOUBLE,dum2:DOUBLE

+ 6 - 0
warehouse/example_project/__tables__/kmeans_in/data

@@ -0,0 +1,6 @@
+0.0,1.1
+0.1,0.9
+1.0,1.1
+1.1,1.0
+1.1,0.0
+1.0,0.1

+ 3 - 0
warehouse/example_project/__tables__/kmeans_out/__schema__

@@ -0,0 +1,3 @@
+project=example_project
+table=kmeans_out
+columns=dim1:DOUBLE,dim2:DOUBLE

+ 4 - 0
warehouse/example_project/__tables__/kmeans_out/data

@@ -0,0 +1,4 @@
+0,1;2
+1,3
+2,3
+3,-1

+ 3 - 0
warehouse/example_project/__tables__/pagerank_in/__schema__

@@ -0,0 +1,3 @@
+project=example_project
+table=pagerank_in
+columns=vertex:STRING,des_1:STRING,des_2:STRING

+ 0 - 0
warehouse/example_project/__tables__/pagerank_in/data


Some files were not shown because too many files changed in this diff