Akka

Stella981
• 阅读 630

在实际应用中,集群环境里共用一些数据是不可避免的。我的意思是有些数据可以在任何节点进行共享同步读写,困难的是如何解决更改冲突问题。本来可以通过分布式数据库来实现这样的功能,但使用和维护成本又过高,不值得。分布式数据类型distributed-data (ddata)正是为解决这样的困局而设计的。akka提供了一组CRDT(ConflictFreeReplicatedDataType 免冲突可复制数据类型)和一套管理方法来实现分布式数据在集群中的免冲突共享共用。

akka提供的分布式数据共享管理方案是通过replicator来实现的。replicator就是一种actor, 在集群的每一个节点运行replicator后,各节点相同actor路径(去掉地址信息后)的replicator可以通过gissip协议进行沟通,仿佛连接成一个replicator网络通道。replicator提供一套解决数据更新冲突及数据同步的api。首先,共享数据结构是在各节点的replicator中构建的,数据更新时各节点程序把包嵌共享数据类型指定和对该数据更新方法函数的消息发送给本节点的replicator去更新并通过gossip协议向其它节点的replicator同步,同时解决同步时发生的冲突问题。由于数据是存在于replicator内的,所以数据值的读取同样是通过向本地replicator发送数据读取消息实现的。

replicator作为一个actor,可以通过在.conf文件中定义akka-cluster-ddata-DistributedData扩展来启动,又或者直接通过replicator.prop构建。个人认为直接构建actor会灵活许多,而且可以在一个节点上构建多个replicator,因为不同节点上的replicator是通过actor路径来分群组的。下面是通过replicator.prop构建replicator的示范代码:

  val replicator = system.actorOf(Replicator.props(
    ReplicatorSettings(system).withGossipInterval(1.second)), "replicator")

如果使用配置文件中的akka.extension 进行构建:

akka {
   extensions = ["akka.cluster.ddata.DistributedData"]
 ...
}

val replicator = DistributedData(context.system).replicator

CRDT是某种key,value数据类型。CRDT value主要包括Counter,Flag,Set,Map几种类型,包括:

/**
 * Implements a boolean flag CRDT that is initialized to `false` and
 * can be switched to `true`. `true` wins over `false` in merge.
 *
 * This class is immutable, i.e. "modifying" methods return a new instance.
 */
final case class Flag(enabled: Boolean) 
final case class FlagKey(_id: String) 

/**
 * Implements a 'Growing Counter' CRDT, also called a 'G-Counter'.
 * A G-Counter is a increment-only counter (inspired by vector clocks) in
 * which only increment and merge are possible. Incrementing the counter
 * adds 1 to the count for the current node. Divergent histories are
 * resolved by taking the maximum count for each node (like a vector
 * clock merge). The value of the counter is the sum of all node counts.
 *
 * This class is immutable, i.e. "modifying" methods return a new instance.
 */
final class GCounter
final case class GCounterKey(_id: String)

/**
 * Implements a 'Increment/Decrement Counter' CRDT, also called a 'PN-Counter'.
 * PN-Counters allow the counter to be incremented by tracking the
 * increments (P) separate from the decrements (N). Both P and N are represented
 * as two internal [[GCounter]]s. Merge is handled by merging the internal P and N
 * counters. The value of the counter is the value of the P counter minus
 * the value of the N counter.
 *
 * This class is immutable, i.e. "modifying" methods return a new instance.
 */
final class PNCounter 
final case class PNCounterKey(_id: String)

/**
 * Implements a 'Add Set' CRDT, also called a 'G-Set'. You can't
 * remove elements of a G-Set.
 * A G-Set doesn't accumulate any garbage apart from the elements themselves.
 * This class is immutable, i.e. "modifying" methods return a new instance.
 */
final case class GSet[A]
final case class GSetKey[A](_id: String)

/**
 * Implements a 'Observed Remove Set' CRDT, also called a 'OR-Set'.
 * Elements can be added and removed any number of times. Concurrent add wins
 * over remove.
 *
 * The ORSet has a version vector that is incremented when an element is added to
 * the set. The `node -> count` pair for that increment is stored against the
 * element as its "birth dot". Every time the element is re-added to the set,
 * its "birth dot" is updated to that of the `node -> count` version vector entry
 * resulting from the add. When an element is removed, we simply drop it, no tombstones.
 *
 * When an element exists in replica A and not replica B, is it because A added
 * it and B has not yet seen that, or that B removed it and A has not yet seen that?
 * In this implementation we compare the `dot` of the present element to the version vector
 * in the Set it is absent from. If the element dot is not "seen" by the Set version vector,
 * that means the other set has yet to see this add, and the item is in the merged
 * Set. If the Set version vector dominates the dot, that means the other Set has removed this
 * element already, and the item is not in the merged Set.
 *
 * This class is immutable, i.e. "modifying" methods return a new instance.
 */
final class ORSet[A]
final case class ORSetKey[A](_id: String)

/**
 * Implements a 'Observed Remove Map' CRDT, also called a 'OR-Map'.
 *
 * It has similar semantics as an [[ORSet]], but in case of concurrent updates
 * the values are merged, and must therefore be [[ReplicatedData]] types themselves.
 *
 * This class is immutable, i.e. "modifying" methods return a new instance.
 */
final class ORMap[A, B <: ReplicatedData]
final case class ORMapKey[A, B <: ReplicatedData](_id: String)

/**
 * An immutable multi-map implementation. This class wraps an
 * [[ORMap]] with an [[ORSet]] for the map's value.
 *
 * This class is immutable, i.e. "modifying" methods return a new instance.
 */
final class ORMultiMap[A, B]
final case class ORMultiMapKey[A, B](_id: String)

/**
 * Map of named counters. Specialized [[ORMap]] with [[PNCounter]] values.
 *
 * This class is immutable, i.e. "modifying" methods return a new instance.
 */
final class PNCounterMap[A]
final case class PNCounterMapKey[A](_id: String)

综合统计,akka提供现成的CRDT类型包括:

Counters: GCounter, PNCounter
Sets: GSet, ORSet
Maps: ORMap, ORMultiMap, LWWMap, PNCounterMap
Registers: LWWRegister, Flag

CRDT操作结果也可以通过订阅方式获取。用户发送Subscribe消息给replicator订阅有关Key[A]数据的操作结果:

 /**
   * Register a subscriber that will be notified with a [[Changed]] message
   * when the value of the given `key` is changed. Current value is also
   * sent as a [[Changed]] message to a new subscriber.
   *
   * Subscribers will be notified periodically with the configured `notify-subscribers-interval`,
   * and it is also possible to send an explicit `FlushChanges` message to
   * the `Replicator` to notify the subscribers immediately.
   *
   * The subscriber will automatically be unregistered if it is terminated.
   *
   * If the key is deleted the subscriber is notified with a [[Deleted]]
   * message.
   */
  final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef) extends ReplicatorMessage
  /**
   * Unregister a subscriber.
   *
   * @see [[Replicator.Subscribe]]
   */
  final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef) extends ReplicatorMessage
  /**
   * The data value is retrieved with [[#get]] using the typed key.
   *
   * @see [[Replicator.Subscribe]]
   */
  final case class Changed[A <: ReplicatedData](key: Key[A])(data: A) extends ReplicatorMessage {
    /**
     * The data value, with correct type.
     * Scala pattern matching cannot infer the type from the `key` parameter.
     */
    def get[T <: ReplicatedData](key: Key[T]): T = {
      require(key == this.key, "wrong key used, must use contained key")
      data.asInstanceOf[T]
    }

    /**
     * The data value. Use [[#get]] to get the fully typed value.
     */
    def dataValue: A = data
  }

  final case class Deleted[A <: ReplicatedData](key: Key[A]) extends NoSerializationVerificationNeeded {
    override def toString: String = s"Deleted [$key]"
  }

replicator完成操作后发布topic为Key[A]的Changed, Deleted消息。

分布式数据读写是通过发送消息给本地的replicator来实现的。读写消息包括Update,Get,Delete。读取数据用Get,也可以订阅CRDT的更新状态消息Changed, Deleted。

赋予CRDT复制和免冲突特性的应该是replicator对Update这个消息的处理方式。Update消息的构建代码如下:

  final case class Update[A <: ReplicatedData](key: Key[A], writeConsistency: WriteConsistency,request: Option[Any])(val modify: Option[A] ⇒ A) 
extends Command[A] with NoSerializationVerificationNeeded {...}

def apply[A <: ReplicatedData](
      key: Key[A], initial: A, writeConsistency: WriteConsistency,
      request: Option[Any] = None)(modify: A ⇒ A): Update[A] =
      Update(key, writeConsistency, request)(modifyWithInitial(initial, modify))

private def modifyWithInitial[A <: ReplicatedData](initial: A, modify: A ⇒ A): Option[A] ⇒ A = {
      case Some(data) ⇒ modify(data)
      case None       ⇒ modify(initial)
    }

我们看到在Update类型里包嵌了数据标示Key[A]和一个函数modify: Option[A] => A。replicator会用这个modify函数来对CRDT数据A进行转换处理。构建器函数apply还包括了A类型数据的初始值,在第一次引用这个数据时就用initial这个初始值,这个从modifyWithInitial函数和它在apply里的引用可以了解。下面是这个Update消息的使用示范:

  val timeout = 3.seconds.dilated

  val KeyA = GCounterKey("A")
  val KeyB = ORSetKey[String]("B")
  val KeyC = PNCounterMapKey[String]("C")
  val KeyD = ORMultiMapKey[String, String]("D")
  val KeyE = ORMapKey[String, GSet[String]]("E")

  replicator ! Update(KeyA, GCounter(), WriteAll(timeout))(_ + 3)

  replicator ! Update(KeyB, ORSet(), WriteAll(timeout))(_ + "a" + "b" + "c")

  replicator ! Update(KeyC, PNCounterMap.empty[String], WriteAll(timeout)) { _ increment "x" increment "y" }

  replicator ! Update(KeyD, ORMultiMap.empty[String, String], WriteAll(timeout)) { _ + ("a" → Set("A")) }

  replicator ! Update(KeyE, ORMap.empty[String, GSet[String]], WriteAll(timeout)) { _ + ("a" → GSet.empty[String].add("A")) }

由于CRDT数据读写是通过消息发送形式实现的,读写结果也是通过消息形式返回的。数据读取返回消息里包嵌了结果数据。下面就是读写返回结果消息类型:

/*------------------UPDATE STATE MESSAGES -----------*/
  final case class UpdateSuccess[A <: ReplicatedData](key: Key[A], request: Option[Any])
    extends UpdateResponse[A] with DeadLetterSuppression
  sealed abstract class UpdateFailure[A <: ReplicatedData] extends UpdateResponse[A]

  /**
   * The direct replication of the [[Update]] could not be fulfill according to
   * the given [[WriteConsistency consistency level]] and
   * [[WriteConsistency#timeout timeout]].
   *
   * The `Update` was still performed locally and possibly replicated to some nodes.
   * It will eventually be disseminated to other replicas, unless the local replica
   * crashes before it has been able to communicate with other replicas.
   */
  final case class UpdateTimeout[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends UpdateFailure[A]
  /**
   * If the `modify` function of the [[Update]] throws an exception the reply message
   * will be this `ModifyFailure` message. The original exception is included as `cause`.
   */
  final case class ModifyFailure[A <: ReplicatedData](key: Key[A], errorMessage: String, cause: Throwable, request: Option[Any])
    extends UpdateFailure[A] {
    override def toString: String = s"ModifyFailure [$key]: $errorMessage"
  }
  /**
   * The local store or direct replication of the [[Update]] could not be fulfill according to
   * the given [[WriteConsistency consistency level]] due to durable store errors. This is
   * only used for entries that have been configured to be durable.
   *
   * The `Update` was still performed in memory locally and possibly replicated to some nodes,
   * but it might not have been written to durable storage.
   * It will eventually be disseminated to other replicas, unless the local replica
   * crashes before it has been able to communicate with other replicas.
   */
  final case class StoreFailure[A <: ReplicatedData](key: Key[A], request: Option[Any])
    extends UpdateFailure[A] with DeleteResponse[A] {

/* ---------------- GET MESSAGES --------*/
  /**
   * Reply from `Get`. The data value is retrieved with [[#get]] using the typed key.
   */
  final case class GetSuccess[A <: ReplicatedData](key: Key[A], request: Option[Any])(data: A)
    extends GetResponse[A] with ReplicatorMessage {

    /**
     * The data value, with correct type.
     * Scala pattern matching cannot infer the type from the `key` parameter.
     */
    def get[T <: ReplicatedData](key: Key[T]): T = {
      require(key == this.key, "wrong key used, must use contained key")
      data.asInstanceOf[T]
    }

    /**
     * The data value. Use [[#get]] to get the fully typed value.
     */
    def dataValue: A = data
  }
  final case class NotFound[A <: ReplicatedData](key: Key[A], request: Option[Any])
    extends GetResponse[A] with ReplicatorMessage


/*----------------DELETE MESSAGES ---------*/
  final case class DeleteSuccess[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends DeleteResponse[A]
  final case class ReplicationDeleteFailure[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends DeleteResponse[A]
  final case class DataDeleted[A <: ReplicatedData](key: Key[A], request: Option[Any])
    extends RuntimeException with NoStackTrace with DeleteResponse[A] {
    override def toString: String = s"DataDeleted [$key]"
  }

读取返回消息中定义了数据读取方法def dataValue: A 获取数据,或者用类型方法get(Key[A])指定读取目标。下面是一些数据读取例子:

val replicator = DistributedData(system).replicator
val Counter1Key = PNCounterKey("counter1")
val Set1Key = GSetKey[String]("set1")
val Set2Key = ORSetKey[String]("set2")
val ActiveFlagKey = FlagKey("active")

replicator ! Get(Counter1Key, ReadLocal)

val readFrom3 = ReadFrom(n = 3, timeout = 1.second)
replicator ! Get(Set1Key, readFrom3)

val readMajority = ReadMajority(timeout = 5.seconds)
replicator ! Get(Set2Key, readMajority)

val readAll = ReadAll(timeout = 5.seconds)
replicator ! Get(ActiveFlagKey, readAll)

case g @ GetSuccess(Counter1Key, req) ⇒
  val value = g.get(Counter1Key).value
case NotFound(Counter1Key, req) ⇒ // key counter1 does not exist

...

case g @ GetSuccess(Set1Key, req) ⇒
  val elements = g.get(Set1Key).elements
case GetFailure(Set1Key, req) ⇒
// read from 3 nodes failed within 1.second
case NotFound(Set1Key, req)   ⇒ // key set1 does not exist

/*---- return get result to user (sender())  ----*/

  case "get-count" ⇒
    // incoming request to retrieve current value of the counter
    replicator ! Get(Counter1Key, readTwo, request = Some(sender()))

  case g @ GetSuccess(Counter1Key, Some(replyTo: ActorRef)) ⇒
    val value = g.get(Counter1Key).value.longValue
    replyTo ! value
  case GetFailure(Counter1Key, Some(replyTo: ActorRef)) ⇒
    replyTo ! -1L
  case NotFound(Counter1Key, Some(replyTo: ActorRef)) ⇒
    replyTo ! 0L

下面是用消息订阅方式获取读写状态的示范:

  replicator ! Subscribe(DataKey, self)

...

    case c @ Changed(DataKey) ⇒
      val data = c.get(DataKey)
      log.info("Current elements: {}", data.elements)

在下面我们做一个例子来示范几种CRDT数据的读写和监控操作:

object DDataUpdator {

  case object IncCounter
  case class AddToSet(item: String)
  case class AddToMap(item: String)
  case object ReadSet
  case object ReadMap
  case object ShutDownDData


  val KeyCounter = GCounterKey("counter")
  val KeySet = ORSetKey[String]("gset")
  val KeyMap = ORMultiMapKey[Long, String]("ormap")

  val timeout = 300 millis
  val writeAll = WriteAll(timeout)
  val readAll = ReadAll(timeout)


  def create(port: Int): ActorRef = {
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port = $port")
      .withFallback(ConfigFactory.load())
    val system = ActorSystem("DDataSystem",config)
    system.actorOf(Props[DDataUpdator],s"updator-$port")
  }

}

class DDataUpdator extends Actor with ActorLogging {
  import DDataUpdator._
  implicit val cluster = Cluster(context.system)
  val replicator = DistributedData(context.system).replicator


  replicator ! Subscribe(KeyCounter,self)
  replicator ! Subscribe(KeySet,self)
  replicator ! Subscribe(KeyMap,self)

  override def receive: Receive = {
    case IncCounter =>
       log.info(s"******* Incrementing counter... *****")
       replicator ! Update(KeyCounter,GCounter(),writeAll)(_ + 1)
    case UpdateSuccess(KeyCounter,_) =>
      log.info(s"********** Counter updated successfully ********")
    case UpdateTimeout(KeyCounter,_) =>
      log.info(s"******* Counter update timed out! *****")
    case ModifyFailure(KeyCounter,msg,err,_) =>
      log.info(s"******* Counter update failed with error: ${msg} *****")
    case StoreFailure(KeyCounter,_) =>
      log.info(s"******* Counter value store failed! *****")
    case c @ Changed(KeyCounter) ⇒
      val data = c.get(KeyCounter)
      log.info("********Current count: {}*******", data.getValue)


    case AddToSet(item) =>
      replicator ! Update(KeySet,ORSet.empty[String],writeAll)(_ + item)
    case UpdateSuccess(KeySet,_) =>
      log.info(s"**********Add to ORSet successfully ********")
    case UpdateTimeout(KeySet,_) =>
      log.info(s"******* Add to ORSet timed out! *****")
    case ModifyFailure(KeySet,msg,err,_) =>
      log.info(s"******* Add to ORSet failed with error: ${msg} *****")
    case StoreFailure(KeySet,_) =>
      log.info(s"******* ORSet items store failed! *****")
    case c @ Changed(KeySet) =>
      val data = c.get(KeySet)
      log.info("********Items in ORSet: {}*******", data.elements)
    case ReadSet =>
      replicator ! Get(KeySet,readAll)
    case g @ GetSuccess(KeySet, req) =>
      val value = g.get(KeySet)
      log.info("********Current items read in ORSet: {}*******", value.elements)
    case NotFound(KeySet, req) =>
      log.info("******No item found in ORSet!!!*******")



    case AddToMap(item) =>
       replicator ! Get(KeyCounter,readAll,Some(AddToMap(item)))
    case g @ GetSuccess(KeyCounter,Some(AddToMap(item))) =>
      val idx: Long = g.get(KeyCounter).getValue.longValue()
      log.info(s"*********** got counter=${idx} with item: $item ************")
      replicator ! Update(KeyMap,ORMultiMap.empty[Long,String],writeAll)(_ + (idx -> Set(item)))
      replicator ! Update(KeyCounter,GCounter(),writeAll)(_ + 1)
    case c @ Changed(KeyMap) =>
      val data = c.get(KeyMap).entries
      log.info("******** Items in ORMultiMap: {}*******", data)
    case ReadMap =>
      replicator ! Get(KeyMap,readAll)
    case g @ GetSuccess(KeyMap, req) =>
      val value = g.get(KeyMap)
      log.info("********Current items read in ORMultiMap: {}*******", value.entries)
    case NotFound(KeyMap, req) =>
      log.info("****** No item found in ORMultiMap!!! *******")



    case ShutDownDData => context.system.terminate()

  }

在这个例子里我们示范了每种CRDT数据的通用操作方法。然后我们再测试一下使用结果:

object DDataDemo extends App {
  import DDataUpdator._

  val ud1 = create(2551)
  val ud2 = create(2552)
  val ud3 = create(2553)
  scala.io.StdIn.readLine()

  ud1 ! IncCounter
  ud2 ! AddToSet("Apple")
  ud1 ! AddToSet("Orange")

  scala.io.StdIn.readLine()

  ud2 ! IncCounter
  ud2 ! AddToSet("Pineapple")
  ud1 ! IncCounter
  ud1 ! AddToMap("Cat")

  scala.io.StdIn.readLine()

  ud1 ! AddToMap("Dog")
  ud2 ! AddToMap("Tiger")
  scala.io.StdIn.readLine()

  ud3 ! ReadSet
  ud3 ! ReadMap
  scala.io.StdIn.readLine()


   ud1 ! ShutDownDData
   ud2 ! ShutDownDData
   ud3 ! ShutDownDData
}

结果如下:

[INFO] [12/24/2018 08:33:40.500] [DDataSystem-akka.actor.default-dispatcher-16] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ******* Incrementing counter... *****
[INFO] [12/24/2018 08:33:40.585] [DDataSystem-akka.actor.default-dispatcher-26] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] **********Add to ORSet successfully ********
[INFO] [12/24/2018 08:33:40.585] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********** Counter updated successfully ********
[INFO] [12/24/2018 08:33:40.585] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] **********Add to ORSet successfully ********
[INFO] [12/24/2018 08:33:40.726] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********Current count: 1*******
[INFO] [12/24/2018 08:33:40.726] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********Items in ORSet: Set(Orange, Apple)*******
[INFO] [12/24/2018 08:33:40.775] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********Items in ORSet: Set(Apple, Orange)*******
[INFO] [12/24/2018 08:33:40.775] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********Current count: 1*******
[INFO] [12/24/2018 08:33:40.829] [DDataSystem-akka.actor.default-dispatcher-23] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Current count: 1*******
[INFO] [12/24/2018 08:33:40.829] [DDataSystem-akka.actor.default-dispatcher-23] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Items in ORSet: Set(Apple, Orange)*******


[INFO] [12/24/2018 08:34:19.707] [DDataSystem-akka.actor.default-dispatcher-23] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ******* Incrementing counter... *****
[INFO] [12/24/2018 08:34:19.707] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ******* Incrementing counter... *****
[INFO] [12/24/2018 08:34:19.710] [DDataSystem-akka.actor.default-dispatcher-23] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********** Counter updated successfully ********
[INFO] [12/24/2018 08:34:19.711] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********** Counter updated successfully ********
[INFO] [12/24/2018 08:34:19.712] [DDataSystem-akka.actor.default-dispatcher-28] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] **********Add to ORSet successfully ********
[INFO] [12/24/2018 08:34:19.723] [DDataSystem-akka.actor.default-dispatcher-17] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********Current count: 3*******
[INFO] [12/24/2018 08:34:19.723] [DDataSystem-akka.actor.default-dispatcher-17] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********Items in ORSet: Set(Orange, Apple, Pineapple)*******
[INFO] [12/24/2018 08:34:19.733] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] *********** got counter=3 with item: Cat ************
[INFO] [12/24/2018 08:34:19.767] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********** Counter updated successfully ********
[INFO] [12/24/2018 08:34:19.772] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********Current count: 4*******
[INFO] [12/24/2018 08:34:19.773] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********Items in ORSet: Set(Apple, Orange, Pineapple)*******
[INFO] [12/24/2018 08:34:19.774] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ******** Items in ORMultiMap: Map(3 -> Set(Cat))*******
[INFO] [12/24/2018 08:34:19.828] [DDataSystem-akka.actor.default-dispatcher-17] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Current count: 4*******
[INFO] [12/24/2018 08:34:19.828] [DDataSystem-akka.actor.default-dispatcher-17] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Items in ORSet: Set(Apple, Orange, Pineapple)*******
[INFO] [12/24/2018 08:34:19.828] [DDataSystem-akka.actor.default-dispatcher-17] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ******** Items in ORMultiMap: Map(3 -> Set(Cat))*******
[INFO] [12/24/2018 08:34:20.222] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ******** Items in ORMultiMap: Map(3 -> Set(Cat))*******
[INFO] [12/24/2018 08:34:20.223] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********Current count: 4*******

[INFO] [12/24/2018 08:34:45.918] [DDataSystem-akka.actor.default-dispatcher-25] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] *********** got counter=4 with item: Tiger ************
[INFO] [12/24/2018 08:34:45.919] [DDataSystem-akka.actor.default-dispatcher-16] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] *********** got counter=4 with item: Dog ************
[INFO] [12/24/2018 08:34:45.920] [DDataSystem-akka.actor.default-dispatcher-15] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Current items read in ORSet: Set(Apple, Orange, Pineapple)*******
[INFO] [12/24/2018 08:34:45.922] [DDataSystem-akka.actor.default-dispatcher-22] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Current items read in ORMultiMap: Map(3 -> Set(Cat))*******
[INFO] [12/24/2018 08:34:45.925] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********** Counter updated successfully ********
[INFO] [12/24/2018 08:34:45.926] [DDataSystem-akka.actor.default-dispatcher-27] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********** Counter updated successfully ********
[INFO] [12/24/2018 08:34:46.221] [DDataSystem-akka.actor.default-dispatcher-2] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ******** Items in ORMultiMap: Map(4 -> Set(Dog, Tiger), 3 -> Set(Cat))*******
[INFO] [12/24/2018 08:34:46.221] [DDataSystem-akka.actor.default-dispatcher-2] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********Current count: 6*******
[INFO] [12/24/2018 08:34:46.272] [DDataSystem-akka.actor.default-dispatcher-27] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ******** Items in ORMultiMap: Map(4 -> Set(Tiger, Dog), 3 -> Set(Cat))*******
[INFO] [12/24/2018 08:34:46.272] [DDataSystem-akka.actor.default-dispatcher-27] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********Current count: 6*******


[INFO] [12/24/2018 08:34:46.326] [DDataSystem-akka.actor.default-dispatcher-22] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ******** Items in ORMultiMap: Map(4 -> Set(Dog, Tiger), 3 -> Set(Cat))*******
[INFO] [12/24/2018 08:34:46.326] [DDataSystem-akka.actor.default-dispatcher-22] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Current count: 6*******

注意最后一段显示结果是在另一个节点2553上读取其它节点上更新的ORSet和ORMultiMap里面的数据。其中Map(4->set(Dog,Tiger)) 应该是分两次读取了Counter后再更新的。不过由于两次消息发送时间间隔太短,Counter还没来得及更新复制。

下面是这个例子的全部源代码:

build.sbt

name := "akka-distributed-data"

version := "0.1"

scalaVersion := "2.12.8"

libraryDependencies := Seq(
  "com.typesafe.akka" %% "akka-actor" % "2.5.19",
  "com.typesafe.akka" %% "akka-cluster-tools" % "2.5.19",
  "com.typesafe.akka" %% "akka-distributed-data" % "2.5.19"
)

resources/application.conf

akka {
   actor.provider = "cluster"
   remote {
     netty.tcp.port = 0
     netty.tcp.hostname = "localhost"
   }

  extensions = ["akka.cluster.ddata.DistributedData"]

  cluster {
    seed-nodes = [
      "akka.tcp://DDataSystem@localhost:2551",
      "akka.tcp://DDataSystem@localhost:2552"]

    # auto downing is NOT safe for production deployments.
    # you may want to use it during development, read more about it in the docs.
    #
    # auto-down-unreachable-after = 10s
  }

}

DDataUpdator.scala

import akka.actor._
import akka.cluster.ddata._
import Replicator._
import akka.cluster.Cluster
import com.typesafe.config.ConfigFactory

import scala.concurrent.duration._

object DDataUpdator {

  case object IncCounter
  case class AddToSet(item: String)
  case class AddToMap(item: String)
  case object ReadSet
  case object ReadMap
  case object ShutDownDData


  val KeyCounter = GCounterKey("counter")
  val KeySet = ORSetKey[String]("gset")
  val KeyMap = ORMultiMapKey[Long, String]("ormap")

  val timeout = 300 millis
  val writeAll = WriteAll(timeout)
  val readAll = ReadAll(timeout)


  def create(port: Int): ActorRef = {
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port = $port")
      .withFallback(ConfigFactory.load())
    val system = ActorSystem("DDataSystem",config)
    system.actorOf(Props[DDataUpdator],s"updator-$port")
  }

}

class DDataUpdator extends Actor with ActorLogging {
  import DDataUpdator._
  implicit val cluster = Cluster(context.system)
  val replicator = DistributedData(context.system).replicator


  replicator ! Subscribe(KeyCounter,self)
  replicator ! Subscribe(KeySet,self)
  replicator ! Subscribe(KeyMap,self)

  override def receive: Receive = {
    case IncCounter =>
       log.info(s"******* Incrementing counter... *****")
       replicator ! Update(KeyCounter,GCounter(),writeAll)(_ + 1)
    case UpdateSuccess(KeyCounter,_) =>
      log.info(s"********** Counter updated successfully ********")
    case UpdateTimeout(KeyCounter,_) =>
      log.info(s"******* Counter update timed out! *****")
    case ModifyFailure(KeyCounter,msg,err,_) =>
      log.info(s"******* Counter update failed with error: ${msg} *****")
    case StoreFailure(KeyCounter,_) =>
      log.info(s"******* Counter value store failed! *****")
    case c @ Changed(KeyCounter) ⇒
      val data = c.get(KeyCounter)
      log.info("********Current count: {}*******", data.getValue)


    case AddToSet(item) =>
      replicator ! Update(KeySet,ORSet.empty[String],writeAll)(_ + item)
    case UpdateSuccess(KeySet,_) =>
      log.info(s"**********Add to ORSet successfully ********")
    case UpdateTimeout(KeySet,_) =>
      log.info(s"******* Add to ORSet timed out! *****")
    case ModifyFailure(KeySet,msg,err,_) =>
      log.info(s"******* Add to ORSet failed with error: ${msg} *****")
    case StoreFailure(KeySet,_) =>
      log.info(s"******* ORSet items store failed! *****")
    case c @ Changed(KeySet) =>
      val data = c.get(KeySet)
      log.info("********Items in ORSet: {}*******", data.elements)
    case ReadSet =>
      replicator ! Get(KeySet,readAll)
    case g @ GetSuccess(KeySet, req) =>
      val value = g.get(KeySet)
      log.info("********Current items read in ORSet: {}*******", value.elements)
    case NotFound(KeySet, req) =>
      log.info("******No item found in ORSet!!!*******")



    case AddToMap(item) =>
       replicator ! Get(KeyCounter,readAll,Some(AddToMap(item)))
    case g @ GetSuccess(KeyCounter,Some(AddToMap(item))) =>
      val idx: Long = g.get(KeyCounter).getValue.longValue()
      log.info(s"*********** got counter=${idx} with item: $item ************")
      replicator ! Update(KeyMap,ORMultiMap.empty[Long,String],writeAll)(_ + (idx -> Set(item)))
      replicator ! Update(KeyCounter,GCounter(),writeAll)(_ + 1)
    case c @ Changed(KeyMap) =>
      val data = c.get(KeyMap).entries
      log.info("******** Items in ORMultiMap: {}*******", data)
    case ReadMap =>
      replicator ! Get(KeyMap,readAll)
    case g @ GetSuccess(KeyMap, req) =>
      val value = g.get(KeyMap)
      log.info("********Current items read in ORMultiMap: {}*******", value.entries)
    case NotFound(KeyMap, req) =>
      log.info("****** No item found in ORMultiMap!!! *******")



    case ShutDownDData => context.system.terminate()

  }
}


object DDataDemo extends App {
  import DDataUpdator._

  val ud1 = create(2551)
  val ud2 = create(2552)
  val ud3 = create(2553)
  scala.io.StdIn.readLine()

  ud1 ! IncCounter
  ud2 ! AddToSet("Apple")
  ud1 ! AddToSet("Orange")

  scala.io.StdIn.readLine()

  ud2 ! IncCounter
  ud2 ! AddToSet("Pineapple")
  ud1 ! IncCounter
  ud1 ! AddToMap("Cat")

  scala.io.StdIn.readLine()

  ud1 ! AddToMap("Dog")
  ud2 ! AddToMap("Tiger")
  scala.io.StdIn.readLine()

  ud3 ! ReadSet
  ud3 ! ReadMap
  scala.io.StdIn.readLine()


   ud1 ! ShutDownDData
   ud2 ! ShutDownDData
   ud3 ! ShutDownDData
}
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
java将前端的json数组字符串转换为列表
记录下在前端通过ajax提交了一个json数组的字符串,在后端如何转换为列表。前端数据转化与请求varcontracts{id:'1',name:'yanggb合同1'},{id:'2',name:'yanggb合同2'},{id:'3',name:'yang
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
为什么mysql不推荐使用雪花ID作为主键
作者:毛辰飞背景在mysql中设计表的时候,mysql官方推荐不要使用uuid或者不连续不重复的雪花id(long形且唯一),而是推荐连续自增的主键id,官方的推荐是auto_increment,那么为什么不建议采用uuid,使用uuid究
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这