1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package net.sf.eos.hadoop.mapred.index;
18
19 import java.io.IOException;
20
21 import org.apache.hadoop.conf.Configuration;
22 import org.apache.hadoop.fs.FSDataInputStream;
23 import org.apache.hadoop.fs.FSDataOutputStream;
24 import org.apache.hadoop.fs.FileStatus;
25 import org.apache.hadoop.fs.FileSystem;
26 import org.apache.hadoop.fs.Path;
27 import org.apache.lucene.store.BufferedIndexInput;
28 import org.apache.lucene.store.BufferedIndexOutput;
29 import org.apache.lucene.store.Directory;
30 import org.apache.lucene.store.IndexInput;
31 import org.apache.lucene.store.IndexOutput;
32 import org.apache.lucene.store.Lock;
33
34
35
36
37
38
39
40 public class FsDirectory extends Directory {
41
42 private FileSystem fs;
43 private Path directory;
44 private int ioFileBufferSize;
45
46 public FsDirectory(final FileSystem fs, final Path directory,
47 final boolean create, final Configuration conf)
48 throws IOException {
49
50 this.fs = fs;
51 this.directory = directory;
52 this.ioFileBufferSize = conf.getInt("io.file.buffer.size", 4096);
53
54 if (create) {
55 create();
56 }
57
58 final FileStatus fileStatus = fs.getFileStatus(directory);
59 if ( ! fileStatus.isDir()) {
60 throw new IOException(directory + " not a directory");
61 }
62 }
63
64 private void create() throws IOException {
65 if (! this.fs.exists(this.directory)) {
66 this.fs.mkdirs(this.directory);
67 }
68
69 final FileStatus fileStatus = this.fs.getFileStatus(this.directory);
70
71 if (! fileStatus.isDir()) {
72 throw new IOException(this.directory + " not a directory");
73 }
74
75
76 Path[] files = this.fs.listPaths(new Path[] {this.directory});
77 for (int i = 0; i < files.length; i++) {
78 if (! this.fs.delete(files[i]))
79 throw new IOException("Cannot delete " + files[i]);
80 }
81 }
82
83 @Override
84 public String[] list() throws IOException {
85 Path[] files = this.fs.listPaths(new Path[] {this.directory});
86 if (files == null) return null;
87
88 String[] result = new String[files.length];
89 for (int i = 0; i < files.length; i++) {
90 result[i] = files[i].getName();
91 }
92 return result;
93 }
94
95 @Override
96 public boolean fileExists(final String name) throws IOException {
97 return this.fs.exists(new Path(this.directory, name));
98 }
99
100 @Override
101 public long fileModified(String name) {
102 throw new UnsupportedOperationException();
103 }
104
105 @Override
106 public void touchFile(String name) {
107 throw new UnsupportedOperationException();
108 }
109
110 @Override
111 public long fileLength(String name) throws IOException {
112 final Path path = new Path(this.directory, name);
113 final FileStatus fileStatus = this.fs.getFileStatus(path);
114 return fileStatus.getLen();
115 }
116
117 @Override
118 public void deleteFile(String name) throws IOException {
119 if (! this.fs.delete(new Path(this.directory, name)))
120 throw new IOException("Cannot delete " + name);
121 }
122
123 @Override
124 public void renameFile(String from, String to) throws IOException {
125
126
127 Path target = new Path(this.directory, to);
128 if (this.fs.exists(target)) {
129 this.fs.delete(target);
130 }
131 this.fs.rename(new Path(this.directory, from), target);
132 }
133
134 @Override
135 public IndexOutput createOutput(String name) throws IOException {
136 Path file = new Path(directory, name);
137 if (fs.exists(file) && !fs.delete(file))
138 throw new IOException("Cannot overwrite: " + file);
139
140 return new DfsIndexOutput(file, this.ioFileBufferSize);
141 }
142
143
144 @Override
145 public IndexInput openInput(final String name) throws IOException {
146 return new DfsIndexInput(new Path(this.directory, name), this.ioFileBufferSize);
147 }
148
149
150 @Override
151 public Lock makeLock(final String name) {
152 return new Lock() {
153 @Override
154 public boolean obtain() {
155 return true;
156 }
157 @Override
158 public void release() {
159 }
160 @Override
161 public boolean isLocked() {
162 throw new UnsupportedOperationException();
163 }
164 @Override
165 public String toString() {
166 return "Lock@" + new Path(directory, name);
167 }
168 };
169 }
170
171 @Override
172 public synchronized void close() throws IOException {
173 this.fs.close();
174 }
175
176 @Override
177 public String toString() {
178 return this.getClass().getName() + "@" + this.directory;
179 }
180
181
182 private class DfsIndexInput extends BufferedIndexInput {
183
184
185 private class Descriptor {
186 public FSDataInputStream in;
187 public long position;
188 public Descriptor(Path file, int ioFileBufferSize) throws IOException {
189 this.in = fs.open(file);
190 }
191 }
192
193 private final Descriptor descriptor;
194 private final long length;
195 private boolean isClone;
196
197 public DfsIndexInput(Path path, int ioFileBufferSize) throws IOException {
198 this.descriptor = new Descriptor(path,ioFileBufferSize);
199 final FileStatus fileStatus = fs.getFileStatus(path);
200 this.length = fileStatus.getLen();
201 }
202
203 @Override
204 protected void readInternal(byte[] b, int offset, int len)
205 throws IOException {
206 synchronized (this.descriptor) {
207 long position = getFilePointer();
208 if (position != this.descriptor.position) {
209 this.descriptor.in.seek(position);
210 this.descriptor.position = position;
211 }
212 int total = 0;
213 do {
214 int i = this.descriptor.in.read(b, offset+total, len-total);
215 if (i == -1) {
216 throw new IOException("read past EOF");
217 }
218
219 this.descriptor.position += i;
220 total += i;
221 } while (total < len);
222 }
223 }
224
225 @Override
226 public void close() throws IOException {
227 if (! this.isClone) {
228 this.descriptor.in.close();
229 }
230 }
231
232 @Override
233 protected void seekInternal(long position) {}
234
235 @Override
236 public long length() {
237 return length;
238 }
239
240 @Override
241 protected void finalize() throws IOException {
242 close();
243 }
244
245 @Override
246 public Object clone() {
247 DfsIndexInput clone = (DfsIndexInput)super.clone();
248 clone.isClone = true;
249 return clone;
250 }
251 }
252
253 private class DfsIndexOutput extends BufferedIndexOutput {
254 private FSDataOutputStream out;
255
256 public DfsIndexOutput(Path path, int ioFileBufferSize) throws IOException {
257 this.out = fs.create(path);
258 }
259
260 @Override
261 public void flushBuffer(byte[] b, int offset, int size) throws IOException {
262 this.out.write(b, offset, size);
263 }
264
265 @Override
266 public void close() throws IOException {
267 super.close();
268 this.out.close();
269 }
270
271 @Override
272 public void seek(long pos) throws IOException {
273 throw new UnsupportedOperationException();
274 }
275
276 @Override
277 public long length() throws IOException {
278 return this.out.getPos();
279 }
280
281 @Override
282 protected void finalize() throws IOException {
283 this.out.close();
284 }
285
286 }
287
288 }
289