Review Board 1.7.22


sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONGroupScan.java
Revision ff5f474 New Change
[20] 15 lines
[+20]
16
 * limitations under the License.
16
 * limitations under the License.
17
 ******************************************************************************/
17
 ******************************************************************************/
18

    
   
18

   
19
package org.apache.drill.exec.store.json;
19
package org.apache.drill.exec.store.json;
20

    
   
20

   
21
import static com.google.common.base.Preconditions.checkArgument;
21
import com.fasterxml.jackson.annotation.*;
22

    
   
22
import org.apache.drill.common.config.DrillConfig;
23
import java.io.File;
23
import org.apache.drill.common.logical.StorageEngineConfig;
24
import java.net.URI;
24
import org.apache.drill.exec.exception.SetupException;
25
import java.util.Collections;

   
26
import java.util.LinkedList;

   
27
import java.util.List;

   
28

    
   

   
29
import org.apache.drill.exec.physical.EndpointAffinity;
25
import org.apache.drill.exec.physical.EndpointAffinity;
30
import org.apache.drill.exec.physical.OperatorCost;
26
import org.apache.drill.exec.physical.OperatorCost;
31
import org.apache.drill.exec.physical.ReadEntry;
27
import org.apache.drill.exec.physical.ReadEntry;

    
   
28
import org.apache.drill.exec.physical.ReadEntryWithPath;
32
import org.apache.drill.exec.physical.base.AbstractGroupScan;
29
import org.apache.drill.exec.physical.base.AbstractGroupScan;
33
import org.apache.drill.exec.physical.base.PhysicalOperator;
30
import org.apache.drill.exec.physical.base.PhysicalOperator;
34
import org.apache.drill.exec.physical.base.Size;
31
import org.apache.drill.exec.physical.base.Size;
35
import org.apache.drill.exec.physical.base.SubScan;
32
import org.apache.drill.exec.physical.base.SubScan;
36
import org.apache.drill.exec.proto.CoordinationProtos;
33
import org.apache.drill.exec.proto.CoordinationProtos;

    
   
34
import org.apache.drill.exec.store.StorageEngineRegistry;

    
   
35
import org.apache.drill.exec.store.parquet.ParquetStorageEngineConfig;
37

    
   
36

   
38
import com.fasterxml.jackson.annotation.JsonCreator;
37
import java.util.Collections;
39
import com.fasterxml.jackson.annotation.JsonProperty;
38
import java.util.LinkedList;
40
import com.fasterxml.jackson.annotation.JsonTypeName;
39
import java.util.List;

    
   
40

   

    
   
41
import static com.google.common.base.Preconditions.checkArgument;
41

    
   
42

   
42
@JsonTypeName("json-scan")
43
@JsonTypeName("json-scan")
43
public class JSONGroupScan extends AbstractGroupScan {
44
public class JSONGroupScan extends AbstractGroupScan {
44
    private static int ESTIMATED_RECORD_SIZE = 1024; // 1kb
45
  private static int ESTIMATED_RECORD_SIZE = 1024; // 1kb

    
   
46
  private final StorageEngineRegistry registry;

    
   
47
  private final StorageEngineConfig engineConfig;
45

    
   
48

   
46
    private LinkedList<JSONGroupScan.ScanEntry>[] mappings;
49
  private LinkedList<JSONGroupScan.ScanEntry>[] mappings;
47
    protected final List<JSONGroupScan.ScanEntry> readEntries;
50
  private final List<JSONGroupScan.ScanEntry> readEntries;
48
    private final OperatorCost cost;
51
  private final OperatorCost cost;
49
    private final Size size;
52
  private final Size size;
50
    
53

   
51
    @JsonCreator
54
  @JsonCreator
52
    public JSONGroupScan(@JsonProperty("entries") List<JSONGroupScan.ScanEntry> readEntries) {
55
  public JSONGroupScan(@JsonProperty("entries") List<ScanEntry> entries,
53
        this.readEntries = readEntries;
56
                       @JsonProperty("storageengine") JSONStorageEngineConfig storageEngineConfig,
54
        OperatorCost cost = new OperatorCost(0,0,0,0);
57
                       @JacksonInject StorageEngineRegistry engineRegistry) throws SetupException {
55
        Size size = new Size(0,0);
58
    engineRegistry.init(DrillConfig.create());
56
        for(JSONGroupScan.ScanEntry r : readEntries){
59
    this.registry = engineRegistry;

    
   
60
    this.engineConfig = storageEngineConfig;

    
   
61
    this.readEntries = entries;

    
   
62
    OperatorCost cost = new OperatorCost(0, 0, 0, 0);

    
   
63
    Size size = new Size(0, 0);

    
   
64
    for (JSONGroupScan.ScanEntry r : readEntries) {
57
          cost = cost.add(r.getCost());
65
      cost = cost.add(r.getCost());
58
          size = size.add(r.getSize());
66
      size = size.add(r.getSize());
59
        }
67
    }
60
        this.cost = cost;
68
    this.cost = cost;
61
        this.size = size;
69
    this.size = size;
[+20] [20] 21 lines
[+20] [+] public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> endpoints) {
83

    
   
91

   
84
    @SuppressWarnings("unchecked")
92
  @SuppressWarnings("unchecked")
85
    @Override
93
  @Override
86
    public SubScan getSpecificScan(int minorFragmentId) {
94
  public SubScan getSpecificScan(int minorFragmentId) {
87
        checkArgument(minorFragmentId < mappings.length, "Mappings length [%s] should be longer than minor fragment id [%s] but it isn't.", mappings.length, minorFragmentId);
95
    checkArgument(minorFragmentId < mappings.length, "Mappings length [%s] should be longer than minor fragment id [%s] but it isn't.", mappings.length, minorFragmentId);
88
        return new JSONSubScan(mappings[minorFragmentId]);
96
    try {

    
   
97
      return new JSONSubScan(registry, engineConfig, mappings[minorFragmentId]);

    
   
98
    } catch (SetupException e) {

    
   
99
      e.printStackTrace();

    
   
100
    }

    
   
101
    return null;
89
    }
102
  }
90

    
   
103

   
91
    @Override
104
  @Override
92
    public List<EndpointAffinity> getOperatorAffinity() {
105
  public List<EndpointAffinity> getOperatorAffinity() {
93
        return Collections.emptyList();
106
    return Collections.emptyList();
94
    }
107
  }
95
    

   
96
    public List<JSONGroupScan.ScanEntry> getReadEntries() {

   
97
      return readEntries;

   
98
    }

   
99

    
   
108

   
100
    @Override
109
  @Override

    
   
110
  @JsonIgnore
101
    public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
111
  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
102
        return new JSONGroupScan(readEntries);
112
    try {

    
   
113
      return new JSONGroupScan(readEntries, (JSONStorageEngineConfig) engineConfig, registry);

    
   
114
    } catch (SetupException e) {

    
   
115
      e.printStackTrace();

    
   
116
    }

    
   
117
    return null;
103
    }
118
  }
104

    
   
119

   
105
    public static class ScanEntry implements ReadEntry {
120
  public static class ScanEntry implements ReadEntry {
106
        private final String url;
121
    private final String path;
107
        private Size size;
122
    private Size size;
108

    
   
123

   
109
        @JsonCreator
124
    @JsonCreator
110
        public ScanEntry(@JsonProperty("url") String url) {
125
    public ScanEntry(@JsonProperty("path") String path) {
111
            this.url = url;
126
      this.path = path;
112
            long fileLength = new File(URI.create(url)).length();
127
      size = new Size(ESTIMATED_RECORD_SIZE, ESTIMATED_RECORD_SIZE);
113
            size = new Size(fileLength / ESTIMATED_RECORD_SIZE, ESTIMATED_RECORD_SIZE);

   
114
        }
128
    }
115

    
   
129

   
116
        @Override
130
    @Override
117
        public OperatorCost getCost() {
131
    public OperatorCost getCost() {
118
            return new OperatorCost(1, 1, 2, 2);
132
      return new OperatorCost(1, 1, 2, 2);
119
        }
133
    }
120

    
   
134

   
121
        @Override
135
    @Override
122
        public Size getSize() {
136
    public Size getSize() {
123
            return size;
137
      return size;
124
        }
138
    }
125

    
   
139

   
126
        public String getUrl() {
140
    public String getPath() {
127
            return url;
141
      return path;
128
        }
142
    }
129
    }
143
  }
130

    
   
144

   
131
    @Override
145
  @Override
132
    public int getMaxParallelizationWidth() {
146
  public int getMaxParallelizationWidth() {
[+20] [20] 13 lines
sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONRecordReader.java
Revision eee0fb6 New Change
 
sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONScanBatchCreator.java
Revision eda6b75 New Change
 
sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONStorageEngine.java
New File
 
sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONStorageEngineConfig.java
New File
 
sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java
Revision fe16b3a New Change
 
sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
Revision a2612d5 New Change
 
sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
Revision 2d9524d New Change
 
sandbox/prototype/exec/java-exec/src/test/resources/physical_json_scan_test1.json
Revision 6f08937 New Change
 
  1. sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONGroupScan.java: Loading...
  2. sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONRecordReader.java: Loading...
  3. sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONScanBatchCreator.java: Loading...
  4. sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONStorageEngine.java: Loading...
  5. sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONStorageEngineConfig.java: Loading...
  6. sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java: Loading...
  7. sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java: Loading...
  8. sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java: Loading...
  9. sandbox/prototype/exec/java-exec/src/test/resources/physical_json_scan_test1.json: Loading...