UDTFResource.java 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. package com.aliyun.odps.examples.udf;
  2. import com.aliyun.odps.udf.ExecutionContext;
  3. import com.aliyun.odps.udf.UDFException;
  4. import com.aliyun.odps.udf.UDTF;
  5. import com.aliyun.odps.udf.annotation.Resolve;
  6. import java.io.BufferedReader;
  7. import java.io.IOException;
  8. import java.io.InputStream;
  9. import java.io.InputStreamReader;
  10. import java.util.Iterator;
  11. /**
  12. * project: example_project
  13. * table: wc_in2
  14. * partitions: p2=1,p1=2
  15. * columns: colc,colb
  16. */
  17. @Resolve({"string,string->string,bigint,string"})
  18. public class UDTFResource extends UDTF {
  19. ExecutionContext ctx;
  20. long fileResourceLineCount;
  21. long tableResource1RecordCount;
  22. long tableResource2RecordCount;
  23. @Override
  24. public void setup(ExecutionContext ctx) throws UDFException {
  25. this.ctx = ctx;
  26. try {
  27. InputStream in = ctx.readResourceFileAsStream("file_resource.txt");
  28. BufferedReader br = new BufferedReader(new InputStreamReader(in));
  29. String line;
  30. fileResourceLineCount = 0;
  31. while ((line = br.readLine()) != null) {
  32. fileResourceLineCount++;
  33. }
  34. br.close();
  35. Iterator<Object[]> iterator = ctx.readResourceTable("table_resource1").iterator();
  36. tableResource1RecordCount = 0;
  37. while (iterator.hasNext()) {
  38. tableResource1RecordCount++;
  39. iterator.next();
  40. }
  41. iterator = ctx.readResourceTable("table_resource2").iterator();
  42. tableResource2RecordCount = 0;
  43. while (iterator.hasNext()) {
  44. tableResource2RecordCount++;
  45. iterator.next();
  46. }
  47. } catch (IOException e) {
  48. throw new UDFException(e);
  49. }
  50. }
  51. @Override
  52. public void process(Object[] args) throws UDFException {
  53. String a = (String) args[0];
  54. long b = args[1] == null ? 0 : ((String) args[1]).length();
  55. forward(a, b, "fileResourceLineCount=" + fileResourceLineCount + "|tableResource1RecordCount="
  56. + tableResource1RecordCount + "|tableResource2RecordCount=" + tableResource2RecordCount);
  57. }
  58. }