001/*
002 * Copyright (c) 2015 Maxim Yunusov
003 *    Licensed under the Apache License, Version 2.0 (the "License");
004 *    you may not use this file except in compliance with the License.
005 *    You may obtain a copy of the License at
006 *
007 *        http://www.apache.org/licenses/LICENSE-2.0
008 *
009 *    Unless required by applicable law or agreed to in writing, software
010 *    distributed under the License is distributed on an "AS IS" BASIS,
011 *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
012 *    See the License for the specific language governing permissions and
013 *    limitations under the License.
014 */
015
016package org.maxur.perfmodel.backend.infrastructure;
017
018import org.iq80.leveldb.DB;
019import org.iq80.leveldb.DBIterator;
020import org.iq80.leveldb.Options;
021import org.iq80.leveldb.WriteBatch;
022import org.iq80.leveldb.impl.Iq80DBFactory;
023import org.maxur.perfmodel.backend.service.Benchmark;
024import org.maxur.perfmodel.backend.service.DataSource;
025import org.maxur.perfmodel.backend.service.Database;
026import org.slf4j.Logger;
027
028import javax.annotation.PostConstruct;
029import javax.annotation.PreDestroy;
030import javax.inject.Named;
031import java.io.*;
032import java.util.Collection;
033import java.util.HashSet;
034import java.util.Optional;
035
036import static org.iq80.leveldb.impl.Iq80DBFactory.asString;
037import static org.iq80.leveldb.impl.Iq80DBFactory.bytes;
038import static org.slf4j.LoggerFactory.getLogger;
039
040public class DataSourceLevelDbImpl implements DataSource, Database {
041
042    private static final Logger LOGGER = getLogger(DataSourceLevelDbImpl.class);
043
044    private DB db;
045
046    @SuppressWarnings("unused")
047    @Named("db.folderName")
048    private String dbFolderName;
049
050    @Override
051    @PostConstruct
052    public void init() {
053        if (db != null) {
054            return;
055        }
056        final Options options = new Options();
057        options.createIfMissing(true);
058        try {
059            final Iq80DBFactory factory = Iq80DBFactory.factory;
060            db = factory.open(new File(dbFolderName), options);
061            LOGGER.info("LevelDb Database ({}) is opened", factory.toString());
062        } catch (IOException e) {
063            final String msg = "LevelDb Database is not opened";
064            LOGGER.error(msg, e);
065            throw new IllegalStateException(msg, e);
066        }
067    }
068
069    @Override
070    @PreDestroy
071    public void stop() {
072        try {
073            db.close();
074            LOGGER.info("LevelDb Database is closed now");
075        } catch (IOException e) {
076            LOGGER.error("LevelDb Database is not closed", e);
077        }
078    }
079
080    @Override
081    @Benchmark
082    public <T> Optional<T> get(final String key) throws IOException, ClassNotFoundException {
083        return Optional.<T>ofNullable(objectFrom(db.get(bytes(key))));
084    }
085
086    @Override
087    @Benchmark
088    public <T> Collection<T> findAllByPrefix(final String prefix) throws IOException, ClassNotFoundException {
089        final Collection<T> values = new HashSet<>();
090        try (DBIterator iterator = db.iterator()) {
091            for (iterator.seek(bytes(prefix)); iterator.hasNext(); iterator.next()) {
092                final String key = asString(iterator.peekNext().getKey());
093                if (!key.startsWith(prefix)) {
094                    break;
095                }
096                final byte[] value = iterator.peekNext().getValue();
097                values.add(objectFrom(value));
098            }
099        }
100        return values;
101    }
102
103    @Override
104    @Benchmark
105    public void delete(String key) {
106        db.delete(bytes(key));
107    }
108
109    @Override
110    @Benchmark
111    public WriteBatch createWriteBatch() {
112        return db.createWriteBatch();
113    }
114
115    @Override
116    @Benchmark
117    public void commit(WriteBatch batch) {
118        db.write(batch);
119    }
120
121    @Override
122    @Benchmark
123    public void put(final String key, final Serializable value) throws IOException {
124        db.put(bytes(key), bytesFrom(value));
125    }
126
127    private static <T> T objectFrom(final byte[] bytes) throws IOException, ClassNotFoundException {
128        if (bytes == null) {
129            return null;
130        }
131        try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes); ObjectInput in = new ObjectInputStream(bis)) {
132            //noinspection unchecked
133            return (T) in.readObject();
134        }
135    }
136
137
138    public static byte[] bytesFrom(final Serializable object) throws IOException {
139        if (object == null) {
140            return null;
141        }
142        try (
143                ByteArrayOutputStream bos = new ByteArrayOutputStream();
144                ObjectOutput out = new ObjectOutputStream(bos)
145        ) {
146            out.writeObject(object);
147            return bos.toByteArray();
148        }
149    }
150
151}