Skip to content

Commit

Permalink
Merge branch 'release/0.11.1'
Browse files Browse the repository at this point in the history
Conflicts:
	CHANGES.md
	project/Build.scala
  • Loading branch information
Katya Gonina committed May 21, 2015
2 parents e814142 + dfa3b65 commit 089babd
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 45 deletions.
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Storehaus #

### Version 0.11.1 ###
* storehaus-memcache: pass ttl for MergeableMemcacheStore CAS calls #262
* Upgrade Finagle and Util #265
* Ugprade finagle-memcached to finagle-memcachedx #266
* Elasticsearch test increase timeout #267

### Version 0.11.0 ###
* Add correct String/ChannelBuffer injections #257
* initial scalatest migration #260
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ The [`MergeableStore`](http://twitter.github.com/storehaus/#com.twitter.storehau

Storehaus provides a number of modules wrapping existing key-value stores. Enriching these key-value stores with Storehaus's combinators has been hugely helpful to us here at Twitter. Writing your jobs in terms of Storehaus stores makes it easy to test your jobs; use an in-memory `JMapStore` in testing and a `MemcacheStore` in production.

* [Storehaus-memcache](http://twitter.github.com/storehaus/#com.twitter.storehaus.memcache.MemcacheStore) (wraps Twitter's [finagle-memcached](https://github.com/twitter/finagle/tree/master/finagle-memcached) library)
* [Storehaus-memcache](http://twitter.github.com/storehaus/#com.twitter.storehaus.memcache.MemcacheStore) (wraps Twitter's [finagle-memcachedx](https://github.com/twitter/finagle/tree/master/finagle-memcachedx) library)
* [Storehaus-mysql](http://twitter.github.com/storehaus/#com.twitter.storehaus.mysql.MySqlStore) (wraps Twitter's [finagle-mysql](https://github.com/twitter/finagle/tree/master/finagle-mysql) library)
* [Storehaus-redis](http://twitter.github.com/storehaus/#com.twitter.storehaus.redis.RedisStore) (wraps Twitter's [finagle-redis](https://github.com/twitter/finagle/tree/master/finagle-redis) library)
* [Storehaus-hbase](http://twitter.github.com/storehaus/#com.twitter.storehaus.hbase.HBaseStore)
Expand Down
12 changes: 6 additions & 6 deletions project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object StorehausBuild extends Build {
val sharedSettings = extraSettings ++ ciSettings ++ Seq(
organization := "com.twitter",
scalaVersion := "2.10.5",
version := "0.11.0",
version := "0.11.1",
crossScalaVersions := Seq("2.10.5"),
javacOptions ++= Seq("-source", "1.6", "-target", "1.6"),
javacOptions in doc := Seq("-source", "1.6"),
Expand Down Expand Up @@ -110,13 +110,13 @@ object StorehausBuild extends Build {
def youngestForwardCompatible(subProj: String) =
Some(subProj)
.filterNot(unreleasedModules.contains(_))
.map { s => "com.twitter" % ("storehaus-" + s + "_2.10") % "0.11.0" }
.map { s => "com.twitter" % ("storehaus-" + s + "_2.10") % "0.11.1" }

val algebirdVersion = "0.10.0"
val algebirdVersion = "0.10.1"
val bijectionVersion = "0.8.0"
val utilVersion = "6.22.0"
val utilVersion = "6.24.0"
val scaldingVersion = "0.14.0"
val finagleVersion = "6.22.0"
val finagleVersion = "6.25.0"
val scalatestVersion = "2.2.4"
val specs2Version = "1.13"
lazy val storehaus = Project(
Expand Down Expand Up @@ -178,7 +178,7 @@ object StorehausBuild extends Build {
"com.twitter" %% "algebird-core" % algebirdVersion,
"com.twitter" %% "bijection-core" % bijectionVersion,
"com.twitter" %% "bijection-netty" % bijectionVersion,
"com.twitter" %% "finagle-memcached" % finagleVersion excludeAll(
"com.twitter" %% "finagle-memcachedx" % finagleVersion excludeAll(
// we don't use this and its not on maven central.
ExclusionRule("com.twitter.common.zookeeper"),
ExclusionRule("com.twitter.common")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ trait DefaultElasticContext {
val homeDir = new File(tempFile.getParent + "/" + UUID.randomUUID().toString)
val test_index = "test_index"
val test_type = "test_type"
val DEFAULT_TIMEOUT = 4 * 1000
val DEFAULT_TIMEOUT = 10 * 1000

homeDir.mkdir()
homeDir.deleteOnExit()
Expand All @@ -52,7 +52,7 @@ trait DefaultElasticContext {
node.client()
}
private implicit val formats = native.Serialization.formats(NoTypeHints)
lazy val store = ElasticSearchCaseClassStore[Person](test_index, test_type, client)
val store = ElasticSearchCaseClassStore[Person](test_index, test_type, client)

def refreshIndex(): Unit = {
refresh(test_index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.twitter.storehaus.elasticsearch

import org.scalatest.{OneInstancePerTest, Matchers, WordSpec}
import org.scalatest.{OneInstancePerTest, BeforeAndAfter, Matchers, WordSpec}
import com.twitter.util.{Future, Await}
import com.twitter.storehaus.FutureOps
import org.elasticsearch.action.search.SearchRequestBuilder
Expand All @@ -28,12 +28,18 @@ import org.json4s.{native, NoTypeHints}
* @author Mansur Ashraf
* @since 1/13/14
*/
class ElasticSearchStoreSpecs extends WordSpec with Matchers with OneInstancePerTest with DefaultElasticContext {
class ElasticSearchStoreSpecs extends WordSpec with Matchers
with BeforeAndAfter with OneInstancePerTest with DefaultElasticContext {

private implicit val formats = native.Serialization.formats(NoTypeHints)

private val person = Person("Joe", "Smith", 29)

before {
// wait for the shards to load up
block(DEFAULT_TIMEOUT)
}

"ElasticSearch Store" should {

"Put a value" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package com.twitter.storehaus.memcache

import com.twitter.algebird.Semigroup
import com.twitter.bijection.NumericInjections
import com.twitter.bijection.twitter_util.UtilBijections
import com.twitter.util.{Duration, Future}
import com.twitter.finagle.memcached.Client
import com.twitter.finagle.memcachedx.Client
import com.twitter.io.Buf
import com.twitter.storehaus.ConvertedStore
import com.twitter.storehaus.algebra.MergeableStore
import org.jboss.netty.buffer.ChannelBuffer
Expand All @@ -33,11 +35,17 @@ import com.twitter.bijection.netty.ChannelBufferBijection
* @author Doug Tangren
*/
object MemcacheLongStore {
import UtilBijections.Owned.byteArrayBufBijection

private implicit val cb2ary = ChannelBufferBijection
// Long => String => ChannelBuffer <= String <= Long
private [memcache] implicit val LongInjection: Injection[Long, ChannelBuffer] =
private[memcache] implicit val LongChannelBuffer: Injection[Long, ChannelBuffer] =
Injection.connect[Long, String, Array[Byte], ChannelBuffer]

// Long => String => Buf <= String <= Long
private[memcache] implicit val LongBuf: Injection[Long, Buf] =
Injection.connect[Long, String, Array[Byte], Buf]

def apply(client: Client, ttl: Duration = MemcacheStore.DEFAULT_TTL, flag: Int = MemcacheStore.DEFAULT_FLAG) =
new MemcacheLongStore(MemcacheStore(client, ttl, flag))
}
Expand All @@ -58,7 +66,7 @@ class MemcacheLongStore(underlying: MemcacheStore)
case None => // memcache does not create on increment
underlying
.client
.add(k, v.as[ChannelBuffer])
.add(k, v.as[Buf])
.flatMap { b => if(b) Future.value(None) else merge(kv) }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import com.twitter.bijection.{ Bijection, Codec, Injection }
import com.twitter.bijection.netty.Implicits._
import com.twitter.conversions.time._
import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.memcached.KetamaClientBuilder
import com.twitter.finagle.memcached.protocol.text.Memcached
import com.twitter.finagle.memcachedx.KetamaClientBuilder
import com.twitter.finagle.memcachedx.protocol.text.Memcached
import com.twitter.finagle.netty3.{BufChannelBuffer, ChannelBufferBuf}
import com.twitter.util.{ Duration, Future, Time }
import com.twitter.finagle.memcached.{ GetResult, Client }
import com.twitter.finagle.memcachedx.{ GetResult, Client }
import com.twitter.storehaus.{ FutureOps, Store, WithPutTtl }
import com.twitter.storehaus.algebra.MergeableStore
import org.jboss.netty.buffer.ChannelBuffer
Expand Down Expand Up @@ -113,29 +114,36 @@ object MemcacheStore {
MergeableMemcacheStore[K, V](client, ttl, flag, retries)(kfn)(inj, implicitly[Semigroup[V]])
}

class MemcacheStore(val client: Client, ttl: Duration, flag: Int)
class MemcacheStore(val client: Client, val ttl: Duration, val flag: Int)
extends Store[String, ChannelBuffer]
with WithPutTtl[String, ChannelBuffer, MemcacheStore]
{
override def withPutTtl(ttl: Duration) = new MemcacheStore(client, ttl, flag)

override def get(k: String): Future[Option[ChannelBuffer]] = client.get(k)
override def get(k: String): Future[Option[ChannelBuffer]] =
client.get(k).flatMap {
case None => Future.None
case Some(buf) => Future.value(Some(ChannelBufferBuf.Owned.extract(buf)))
}

override def multiGet[K1 <: String](ks: Set[K1]): Map[K1, Future[Option[ChannelBuffer]]] = {
val memcacheResult: Future[Map[String, Future[Option[ChannelBuffer]]]] =
client.getResult(ks).map { result =>
result.hits.mapValues { v => Future.value(Some(v.value)) } ++
result.failures.mapValues { Future.exception(_) }
result.hits.mapValues { v =>
Future.value(Some(BufChannelBuffer(v.value)))
} ++ result.failures.mapValues(Future.exception)
}
FutureOps.liftValues(ks, memcacheResult, { (k: K1) => Future.value(Future.None) })
.mapValues { _.flatten }
}

protected def set(k: String, v: ChannelBuffer) = client.set(k, flag, ttl.fromNow, v)
protected def set(k: String, v: ChannelBuffer) =
client.set(k, flag, ttl.fromNow, ChannelBufferBuf.Owned(v))

override def put(kv: (String, Option[ChannelBuffer])): Future[Unit] =
kv match {
case (key, Some(value)) => client.set(key, flag, ttl.fromNow, value)
case (key, Some(value)) =>
client.set(key, flag, ttl.fromNow, ChannelBufferBuf.Owned(value))
case (key, None) => client.delete(key).unit
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package com.twitter.storehaus.memcache

import com.twitter.algebird.Semigroup
import com.twitter.bijection.Injection
import com.twitter.finagle.memcached.Client
import com.twitter.finagle.memcachedx.Client
import com.twitter.finagle.netty3.{BufChannelBuffer, ChannelBufferBuf}
import com.twitter.io.Buf
import com.twitter.storehaus.ConvertedStore
import com.twitter.storehaus.algebra.MergeableStore
import com.twitter.util.{ Duration, Future }
Expand Down Expand Up @@ -71,29 +73,40 @@ class MergeableMemcacheStore[K, V](underlying: MemcacheStore, maxRetries: Int)(k
val key = kfn(kv._1)
(currentRetry > maxRetries) match {
case false => // use 'gets' api to obtain casunique token
underlying.client.gets(key).flatMap { res : Option[(ChannelBuffer, ChannelBuffer)] =>
res match {
case Some((cbValue, casunique)) =>
inj.invert(cbValue) match {
case Success(v) => // attempt cas
val resV = semigroup.plus(v, kv._2)
underlying.client.cas(key, inj.apply(resV), casunique).flatMap { success =>
success.booleanValue match {
case true => Future.value(Some(v))
case false => doMerge(kv, currentRetry + 1) // retry
}
underlying.client.gets(key).flatMap {
case Some((cbValue, casunique)) =>
inj.invert(BufChannelBuffer(cbValue)) match {
case Success(v) => // attempt cas
val resV = semigroup.plus(v, kv._2)
val buf = ChannelBufferBuf.Owned(inj.apply(resV))
underlying.client.cas(
key,
underlying.flag,
underlying.ttl.fromNow,
buf,
casunique
).flatMap { success =>
success.booleanValue match {
case true => Future.value(Some(v))
case false => doMerge(kv, currentRetry + 1) // retry
}
case Failure(ex) => Future.exception(ex)
}
// no pre-existing value, try to 'add' it
case None =>
underlying.client.add(key, inj.apply(kv._2)).flatMap { success =>
success.booleanValue match {
case true => Future.None
case false => doMerge(kv, currentRetry + 1) // retry, next retry should call cas
}
case Failure(ex) => Future.exception(ex)
}
// no pre-existing value, try to 'add' it
case None =>
val buf = ChannelBufferBuf.Owned(inj.apply(kv._2))
underlying.client.add(
key,
underlying.flag,
underlying.ttl.fromNow,
buf
).flatMap { success =>
success.booleanValue match {
case true => Future.None
case false => doMerge(kv, currentRetry + 1) // retry, next retry should call cas
}
}
}
}
// no more retries
case true => Future.exception(new MergeFailedException(key))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.twitter.storehaus.memcache
import com.twitter.algebird.Semigroup
import com.twitter.bijection.{ Injection, NumericInjections }
import com.twitter.bijection.netty.ChannelBufferBijection
import com.twitter.finagle.memcached.Client
import com.twitter.finagle.memcachedx.Client
import com.twitter.storehaus.testing.SelfAggregatingCloseableCleanup
import com.twitter.storehaus.testing.generator.NonEmpty
import com.twitter.util.{Await, Future}
Expand Down

0 comments on commit 089babd

Please sign in to comment.