View Javadoc

1   /* Licensed to the Apache Software Foundation (ASF) under one or more
2    * contributor license agreements.  See the NOTICE file distributed with
3    * this work for additional information regarding copyright ownership.
4    * The ASF licenses this file to You under the Apache License, Version 2.0
5    * (the "License"); you may not use this file except in compliance with
6    * the License.  You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package net.sf.eos.hadoop.mapred.index;
18  
19  import java.io.IOException;
20  
21  import org.apache.hadoop.conf.Configuration;
22  import org.apache.hadoop.fs.FSDataInputStream;
23  import org.apache.hadoop.fs.FSDataOutputStream;
24  import org.apache.hadoop.fs.FileStatus;
25  import org.apache.hadoop.fs.FileSystem;
26  import org.apache.hadoop.fs.Path;
27  import org.apache.lucene.store.BufferedIndexInput;
28  import org.apache.lucene.store.BufferedIndexOutput;
29  import org.apache.lucene.store.Directory;
30  import org.apache.lucene.store.IndexInput;
31  import org.apache.lucene.store.IndexOutput;
32  import org.apache.lucene.store.Lock;
33  
34  /**
35   * Reads a Lucene index stored in Hadoops DFS.
36   * <p>Copied from Nutch source code.</p>
37   * @see org.apache.nutch.indexer.FsDirectory
38   * @author Nutch Team
39   */
40  public class FsDirectory extends Directory {
41  
42    private FileSystem fs;
43    private Path directory;
44    private int ioFileBufferSize;
45  
46    public FsDirectory(final FileSystem fs, final Path directory,
47            final boolean create, final Configuration conf)
48      throws IOException {
49  
50      this.fs = fs;
51      this.directory = directory;
52      this.ioFileBufferSize = conf.getInt("io.file.buffer.size", 4096);
53  
54      if (create) {
55        create();
56      }
57  
58      final FileStatus fileStatus = fs.getFileStatus(directory);
59      if ( ! fileStatus.isDir()) {
60        throw new IOException(directory + " not a directory");
61      }
62    }
63  
64    private void create() throws IOException {
65      if (! this.fs.exists(this.directory)) {
66          this.fs.mkdirs(this.directory);
67      }
68  
69      final FileStatus fileStatus = this.fs.getFileStatus(this.directory);
70  
71      if (! fileStatus.isDir()) {
72        throw new IOException(this.directory + " not a directory");
73      }
74  
75      // clear old files
76      Path[] files = this.fs.listPaths(new Path[] {this.directory});
77      for (int i = 0; i < files.length; i++) {
78        if (! this.fs.delete(files[i]))
79          throw new IOException("Cannot delete " + files[i]);
80      }
81    }
82  
83    @Override
84  public String[] list() throws IOException {
85      Path[] files = this.fs.listPaths(new Path[] {this.directory});
86      if (files == null) return null;
87  
88      String[] result = new String[files.length];
89      for (int i = 0; i < files.length; i++) {
90        result[i] = files[i].getName();
91      }
92      return result;
93    }
94  
95    @Override
96  public boolean fileExists(final String name) throws IOException {
97      return this.fs.exists(new Path(this.directory, name));
98    }
99  
100   @Override
101 public long fileModified(String name) {
102     throw new UnsupportedOperationException();
103   }
104 
105   @Override
106 public void touchFile(String name) {
107     throw new UnsupportedOperationException();
108   }
109 
110   @Override
111 public long fileLength(String name) throws IOException {
112         final Path path = new Path(this.directory, name);
113         final FileStatus fileStatus = this.fs.getFileStatus(path);
114         return fileStatus.getLen();
115   }
116 
117   @Override
118 public void deleteFile(String name) throws IOException {
119     if (! this.fs.delete(new Path(this.directory, name)))
120       throw new IOException("Cannot delete " + name);
121   }
122 
123   @Override
124 public void renameFile(String from, String to) throws IOException {
125     // DFS is currently broken when target already exists,
126     // so we explicitly delete the target first.
127     Path target = new Path(this.directory, to);
128     if (this.fs.exists(target)) {
129         this.fs.delete(target);
130     }
131     this.fs.rename(new Path(this.directory, from), target);
132   }
133 
134   @Override
135 public IndexOutput createOutput(String name) throws IOException {
136     Path file = new Path(directory, name);
137     if (fs.exists(file) && !fs.delete(file))      // delete existing, if any
138       throw new IOException("Cannot overwrite: " + file);
139 
140     return new DfsIndexOutput(file, this.ioFileBufferSize);
141   }
142 
143 
144   @Override
145 public IndexInput openInput(final String name) throws IOException {
146     return new DfsIndexInput(new Path(this.directory, name), this.ioFileBufferSize);
147   }
148 
149   /** {@inheritDoc} */
150   @Override
151 public Lock makeLock(final String name) {
152     return new Lock() {
153       @Override
154     public boolean obtain() {
155         return true;
156       }
157       @Override
158     public void release() {
159       }
160       @Override
161     public boolean isLocked() {
162         throw new UnsupportedOperationException();
163       }
164       @Override
165     public String toString() {
166         return "Lock@" + new Path(directory, name);
167       }
168     };
169   }
170 
171   @Override
172 public synchronized void close() throws IOException {
173       this.fs.close();
174   }
175 
176   @Override
177 public String toString() {
178     return this.getClass().getName() + "@" + this.directory;
179   }
180 
181 
182   private class DfsIndexInput extends BufferedIndexInput {
183 
184     /** Shared by clones. */
185     private class Descriptor {
186       public FSDataInputStream in;
187       public long position;                       // cache of in.getPos()
188       public Descriptor(Path file, int ioFileBufferSize) throws IOException {
189         this.in = fs.open(file);
190       }
191     }
192 
193     private final Descriptor descriptor;
194     private final long length;
195     private boolean isClone;
196 
197     public DfsIndexInput(Path path, int ioFileBufferSize) throws IOException {
198       this.descriptor = new Descriptor(path,ioFileBufferSize);
199       final FileStatus fileStatus = fs.getFileStatus(path);
200       this.length = fileStatus.getLen();
201     }
202 
203     @Override
204     protected void readInternal(byte[] b, int offset, int len)
205       throws IOException {
206       synchronized (this.descriptor) {
207         long position = getFilePointer();
208         if (position != this.descriptor.position) {
209             this.descriptor.in.seek(position);
210             this.descriptor.position = position;
211         }
212         int total = 0;
213         do {
214           int i = this.descriptor.in.read(b, offset+total, len-total);
215           if (i == -1) {
216             throw new IOException("read past EOF");
217           }
218 
219           this.descriptor.position += i;
220           total += i;
221         } while (total < len);
222       }
223     }
224 
225     @Override
226     public void close() throws IOException {
227       if (! this.isClone) {
228           this.descriptor.in.close();
229       }
230     }
231 
232     @Override
233     protected void seekInternal(long position) {} // handled in readInternal()
234 
235     @Override
236     public long length() {
237       return length;
238     }
239 
240     @Override
241     protected void finalize() throws IOException {
242       close();                                      // close the file
243     }
244 
245     @Override
246     public Object clone() {
247       DfsIndexInput clone = (DfsIndexInput)super.clone();
248       clone.isClone = true;
249       return clone;
250     }
251   }
252 
253   private class DfsIndexOutput extends BufferedIndexOutput {
254     private FSDataOutputStream out;
255 
256     public DfsIndexOutput(Path path, int ioFileBufferSize) throws IOException {
257         this.out = fs.create(path);
258     }
259 
260     @Override
261     public void flushBuffer(byte[] b, int offset, int size) throws IOException {
262         this.out.write(b, offset, size);
263     }
264 
265     @Override
266     public void close() throws IOException {
267       super.close();
268       this.out.close();
269     }
270 
271     @Override
272     public void seek(long pos) throws IOException {
273       throw new UnsupportedOperationException();
274     }
275 
276     @Override
277     public long length() throws IOException {
278       return this.out.getPos();
279     }
280 
281     @Override
282     protected void finalize() throws IOException {
283         this.out.close();                                // close the file
284     }
285 
286   }
287 
288 }
289