1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.maxur.perfmodel.backend.infrastructure;
17
18 import org.iq80.leveldb.DB;
19 import org.iq80.leveldb.DBIterator;
20 import org.iq80.leveldb.Options;
21 import org.iq80.leveldb.WriteBatch;
22 import org.iq80.leveldb.impl.Iq80DBFactory;
23 import org.maxur.perfmodel.backend.service.Benchmark;
24 import org.maxur.perfmodel.backend.service.DataSource;
25 import org.maxur.perfmodel.backend.service.Database;
26 import org.slf4j.Logger;
27
28 import javax.annotation.PostConstruct;
29 import javax.annotation.PreDestroy;
30 import javax.inject.Named;
31 import java.io.*;
32 import java.util.Collection;
33 import java.util.HashSet;
34 import java.util.Optional;
35
36 import static org.iq80.leveldb.impl.Iq80DBFactory.asString;
37 import static org.iq80.leveldb.impl.Iq80DBFactory.bytes;
38 import static org.slf4j.LoggerFactory.getLogger;
39
40 public class DataSourceLevelDbImpl implements DataSource, Database {
41
42 private static final Logger LOGGER = getLogger(DataSourceLevelDbImpl.class);
43
44 private DB db;
45
46 @SuppressWarnings("unused")
47 @Named("db.folderName")
48 private String dbFolderName;
49
50 @Override
51 @PostConstruct
52 public void init() {
53 if (db != null) {
54 return;
55 }
56 final Options options = new Options();
57 options.createIfMissing(true);
58 try {
59 final Iq80DBFactory factory = Iq80DBFactory.factory;
60 db = factory.open(new File(dbFolderName), options);
61 LOGGER.info("LevelDb Database ({}) is opened", factory.toString());
62 } catch (IOException e) {
63 final String msg = "LevelDb Database is not opened";
64 LOGGER.error(msg, e);
65 throw new IllegalStateException(msg, e);
66 }
67 }
68
69 @Override
70 @PreDestroy
71 public void stop() {
72 try {
73 db.close();
74 LOGGER.info("LevelDb Database is closed now");
75 } catch (IOException e) {
76 LOGGER.error("LevelDb Database is not closed", e);
77 }
78 }
79
80 @Override
81 @Benchmark
82 public <T> Optional<T> get(final String key) throws IOException, ClassNotFoundException {
83 return Optional.<T>ofNullable(objectFrom(db.get(bytes(key))));
84 }
85
86 @Override
87 @Benchmark
88 public <T> Collection<T> findAllByPrefix(final String prefix) throws IOException, ClassNotFoundException {
89 final Collection<T> values = new HashSet<>();
90 try (DBIterator iterator = db.iterator()) {
91 for (iterator.seek(bytes(prefix)); iterator.hasNext(); iterator.next()) {
92 final String key = asString(iterator.peekNext().getKey());
93 if (!key.startsWith(prefix)) {
94 break;
95 }
96 final byte[] value = iterator.peekNext().getValue();
97 values.add(objectFrom(value));
98 }
99 }
100 return values;
101 }
102
103 @Override
104 @Benchmark
105 public void delete(String key) {
106 db.delete(bytes(key));
107 }
108
109 @Override
110 @Benchmark
111 public WriteBatch createWriteBatch() {
112 return db.createWriteBatch();
113 }
114
115 @Override
116 @Benchmark
117 public void commit(WriteBatch batch) {
118 db.write(batch);
119 }
120
121 @Override
122 @Benchmark
123 public void put(final String key, final Serializable value) throws IOException {
124 db.put(bytes(key), bytesFrom(value));
125 }
126
127 private static <T> T objectFrom(final byte[] bytes) throws IOException, ClassNotFoundException {
128 if (bytes == null) {
129 return null;
130 }
131 try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes); ObjectInput in = new ObjectInputStream(bis)) {
132
133 return (T) in.readObject();
134 }
135 }
136
137
138 public static byte[] bytesFrom(final Serializable object) throws IOException {
139 if (object == null) {
140 return null;
141 }
142 try (
143 ByteArrayOutputStream bos = new ByteArrayOutputStream();
144 ObjectOutput out = new ObjectOutputStream(bos)
145 ) {
146 out.writeObject(object);
147 return bos.toByteArray();
148 }
149 }
150
151 }