001/* 002 * Copyright (c) 2015 Maxim Yunusov 003 * Licensed under the Apache License, Version 2.0 (the "License"); 004 * you may not use this file except in compliance with the License. 005 * You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software 010 * distributed under the License is distributed on an "AS IS" BASIS, 011 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 012 * See the License for the specific language governing permissions and 013 * limitations under the License. 014 */ 015 016package org.maxur.perfmodel.backend.infrastructure; 017 018import org.iq80.leveldb.DB; 019import org.iq80.leveldb.DBIterator; 020import org.iq80.leveldb.Options; 021import org.iq80.leveldb.WriteBatch; 022import org.iq80.leveldb.impl.Iq80DBFactory; 023import org.maxur.perfmodel.backend.service.Benchmark; 024import org.maxur.perfmodel.backend.service.DataSource; 025import org.maxur.perfmodel.backend.service.Database; 026import org.slf4j.Logger; 027 028import javax.annotation.PostConstruct; 029import javax.annotation.PreDestroy; 030import javax.inject.Named; 031import java.io.*; 032import java.util.Collection; 033import java.util.HashSet; 034import java.util.Optional; 035 036import static org.iq80.leveldb.impl.Iq80DBFactory.asString; 037import static org.iq80.leveldb.impl.Iq80DBFactory.bytes; 038import static org.slf4j.LoggerFactory.getLogger; 039 040public class DataSourceLevelDbImpl implements DataSource, Database { 041 042 private static final Logger LOGGER = getLogger(DataSourceLevelDbImpl.class); 043 044 private DB db; 045 046 @SuppressWarnings("unused") 047 @Named("db.folderName") 048 private String dbFolderName; 049 050 @Override 051 @PostConstruct 052 public void init() { 053 if (db != null) { 054 return; 055 } 056 final Options options = new Options(); 057 options.createIfMissing(true); 058 try { 059 final Iq80DBFactory factory = Iq80DBFactory.factory; 060 db = factory.open(new File(dbFolderName), options); 061 LOGGER.info("LevelDb Database ({}) is opened", factory.toString()); 062 } catch (IOException e) { 063 final String msg = "LevelDb Database is not opened"; 064 LOGGER.error(msg, e); 065 throw new IllegalStateException(msg, e); 066 } 067 } 068 069 @Override 070 @PreDestroy 071 public void stop() { 072 try { 073 db.close(); 074 LOGGER.info("LevelDb Database is closed now"); 075 } catch (IOException e) { 076 LOGGER.error("LevelDb Database is not closed", e); 077 } 078 } 079 080 @Override 081 @Benchmark 082 public <T> Optional<T> get(final String key) throws IOException, ClassNotFoundException { 083 return Optional.<T>ofNullable(objectFrom(db.get(bytes(key)))); 084 } 085 086 @Override 087 @Benchmark 088 public <T> Collection<T> findAllByPrefix(final String prefix) throws IOException, ClassNotFoundException { 089 final Collection<T> values = new HashSet<>(); 090 try (DBIterator iterator = db.iterator()) { 091 for (iterator.seek(bytes(prefix)); iterator.hasNext(); iterator.next()) { 092 final String key = asString(iterator.peekNext().getKey()); 093 if (!key.startsWith(prefix)) { 094 break; 095 } 096 final byte[] value = iterator.peekNext().getValue(); 097 values.add(objectFrom(value)); 098 } 099 } 100 return values; 101 } 102 103 @Override 104 @Benchmark 105 public void delete(String key) { 106 db.delete(bytes(key)); 107 } 108 109 @Override 110 @Benchmark 111 public WriteBatch createWriteBatch() { 112 return db.createWriteBatch(); 113 } 114 115 @Override 116 @Benchmark 117 public void commit(WriteBatch batch) { 118 db.write(batch); 119 } 120 121 @Override 122 @Benchmark 123 public void put(final String key, final Serializable value) throws IOException { 124 db.put(bytes(key), bytesFrom(value)); 125 } 126 127 private static <T> T objectFrom(final byte[] bytes) throws IOException, ClassNotFoundException { 128 if (bytes == null) { 129 return null; 130 } 131 try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes); ObjectInput in = new ObjectInputStream(bis)) { 132 //noinspection unchecked 133 return (T) in.readObject(); 134 } 135 } 136 137 138 public static byte[] bytesFrom(final Serializable object) throws IOException { 139 if (object == null) { 140 return null; 141 } 142 try ( 143 ByteArrayOutputStream bos = new ByteArrayOutputStream(); 144 ObjectOutput out = new ObjectOutputStream(bos) 145 ) { 146 out.writeObject(object); 147 return bos.toByteArray(); 148 } 149 } 150 151}