Jump to: navigation, search

OpenDaylight Controller:MD-SAL:Architecture:Clustering

Contents

Contact information

  1. IRC: #opendaylight-clustering
  2. email: dev@lists.opendaylight.org with [clustering] in subject, please.

Requirements

Helium

  1. Move InMemoryDataStore implementation to its own bundle.
  2. Sharding : Distribute the data in the datastore into shards so that a subset of shards can be located in any cluster member. Distributed transactions across shards will not be supported. A transaction on a single shard must be supported. The Sharding strategy in Helium will be fixed, that is to say, all the data of a module will be in a single shard.
  3. Persistence : A shard must be backed by a persistent store so that when a cluster member is restarted, the shard can be reconstructed from the persisted data.
  4. Replication : A shard must be replicated to a configurable number (2 for Helium) of replicas.
  5. Clustering Services: An OSGi service which provides information about an Akka cluster
  6. Monitoring : There must be be logging or monitoring of transactions, and data change notifications that would help an app developer tune data access. For example, we know that remote-reads could be expensive. So, we must log reads that took a long time or those trying to get too much data. Similarly, for data change notifications, we should log:
  • If a registration results in too many notifications
  • The data delivered by that notification is too much
  • If the delivery takes much time
  1. Remote Rpc : A mechanism to invoke a method on remote MD-SAL providers
  2. Remote Notifications : A mechanism to notify remote listeners of a notification to remote MD-SAL consumers

Beyond Helium

  1. Data Aggregation : Allow data from multiple shards to be returned in a single read request
  2. Querying : All querying for data using some sort of query API which will work along with the aggregation
  3. Data Validator : Support validation of data that is being inserted into the shard

Questions on requirements

  1. What should the data size be?
  2. What kind of performance is expected? How many data updates will there be per second?

General Design Principles

  1. The Clustered/Sharded Data Store must be a drop-in replacement for the InMemory Data Store
  2. Reuse the InMemory Data Store to represent a shard. This must be so because it already takes care of maintaining a tree which contains data for all modules, and can therefore be scaled down to deal with data for a single module
  3. Use Akka for operations on remote shards. Akka seems to suit the existing design of MD-SAL as it is already based on the actor model.
  4. The sharding strategy must be open to customization. ODL must ship with a default sharding strategy (which will be static and predetermined for a given cluster size, similar to the APIC design. Applications will be responsible for determining the sharding of their data). The sharding strategy determines the location of a shard, possibly based on the instance identifier of a data object, and the collection of the cluster members.
  5. Application/Services reading/writing to the shard must be co-located with the shard to minimize remote transactions as much as possible. Only one application should be the logical owner of a shard. When other applications need that data, they should either utilize the APIs provided by the owner application or subscribe to data change notifications.
  6. In a cluster, at least three replicas must be configured for each shard (and they must be on different hosts). Only when a piece of data is replicated to at least one replica is the "transaction" considered successful.

Problems with the Sharding Strategy

  1. Must migration of shards be based on changes in the sharding strategy?
  2. For Helium - build a REST API to specify the cluster configuration

Akka

We are going to rely heavily on Akka to provide us the building blocks for our clustering solution. The main components of Akka that we will use are:

  1. Akka Remoting
  2. Akka Clustering
  3. Akka Persistence

Design

Components

General Concepts


Component Name Description
ClusteringConfiguration
The ClusteringConfiguration represents information about the cluster. The information it provides would roughly be the following:
  1. What are the members in the cluster?
  2. Which shards live on each node?
  3. What data goes in each shard?
ClusteringService
The ClusteringService would have the following responsibilities:
  1. Read the cluster configuration. Where it reads the cluster configuration from should not matter. Initially, we could even read the configuration from the file system. Over time, we could have a "primary" node come up with a cluster configuration, and distribute it to the other members in the cluster.
  2. Resolve the node name to the actual host name/ip.
  3. Maintain a registration of components that are interested in being notified of member status changes.
DistributedDataStore
The DistributedDataStore would have the following responsibilities:
  1. Implement the DOMStore so that we could replace the InMemoryDataStore with the DistributedDataStore.
  2. Create the local shard actors in accordance with the cluster configuration.
  3. Create the listener wrapper actors when a consumer registers a listener.
Shard
A Shard would be a processor which contains some of the data in the system. A Shard being an actor, communication with it would be by means of messages. The messages passed to a shard would be very similar to the operations on the DOMStore interface.

Since the Shard is a Processor, in accordance with akka-persistence, it is a special actor which when passed a Persistent message will log it to a journal. This journal along with snapshots would be used as a method to recover the state of the DataStore. The state of the Shard would be maintained in an InMemoryDataStore object.

The MD-SAL DataStore supports a three phase commit. The Shard, therefore, also provides the functions of the ThreePhaseCommitCohort.

ShardTransaction
A ShardTransaction would be an actor which wraps an InMemoryDataStoreTransaction. Any operation that needs to be done on a transaction, namely, ""read"", ""write"", ""delete"", and ""ready"" would be fronted by the ShardTransaction. The ShardTransaction will also maintain the state of any writes/deletes that happen on a transaction. This state will be called the "transactionLog". The transactionLog would then be used during commits to persist a transaction to a journal. The journal will be written onto the disk using the persistence module of Akka. The journal will then be used when a controller shards up to reconstruct the state of a shard.
TransactionProxy
The TransactionProxy will hold a reference to a collection of remote ShardTransaction actors, and when returned to the consumer of the DistributedDataStore, could be used to invoke the transaction operations on any remote ShardTransaction object depending on the instance identifier of the object.
ListenerWrapper
The ListenerWrapper is an actor that would represent a local data change listener. It would be created as a remote actor on the node where the Data Change registration is done.
ListenerProxy
The ListenerProxy represents a remote data change listener. When the local Shard issues a data change notification, it is the responsibility of the ListenerProxy to send that data change notification over to the remote ListenerWrapper actor.
ShardCommitCohort
A ShardCommitCohort would be an actor which wraps the InMemoryDataStoreCommitCohort. Any operation that needs to be done on a three phase commit cohort, namely, "canCommit", "preCommit", "commit", and "abort" would be fronted by the ShardCommitCohort.
ThreePhaseCommitCohortProxy
The ThreePhaseCommitCohortProxy holds a reference to a collection of ShardCommitCohorts. It implements the DOMStoreThreePhaseCommitCohort interface and any operation done on the proxy is invoked on every ShardCommitCohort in the collection.

Packaging

The following OSGi bundles must be created:

  1. MD-SAL InMemoryDataStore Implementation (This needs to be moved out of sal-dom-broker.)
  2. MD-SAL Clustering Service API and implementation
  3. MD-SAL Distributed DataStore, Remote Rpc Provider, and Remote Notifications

Configuration

Cluster configuration defines the members (nodes) of the cluster and what lives within it. This configuration can be static or dynamic. To make things simple, we could go with a static configuration for Helium. The configuration could be defined in a file or files which could be put in the ODL distribution. When the ODL controller is started, the configuration file could be passed to it.

When the MD-SAL Clustering Service bundle comes up, it could look at which specific configuration needs to be loaded, reads it from disk, and initializes itself.

Clustering configuration would be as follows:

modules.conf

modules.conf defines how modules will be placed in separate shards. The format of the configuration is as follows

{
  name = "<friendly_name_of_module>"
  namespace = "<the yang namespace of the module>"
  shard-strategy = "module"
}

In Helium the only shard-strategy we support is module which puts all the data of a single module in two shards (one for config and one for operational data). So in the example below the module "inventory" is placed on 2 shards, a config shard and an operational data shard. Likewise for the "topology" module


modules = [
  {
    name = "inventory"
    namespace = "urn:opendaylight:inventory"
    shard-strategy = "module"
  },
  {
    name = "topology"
    namespace = "urn:TBD:params:xml:ns:yang:network-topology"
    shard-strategy = "module"
  },
  {
    name = "toaster"
    namespace = "http://netconfcentral.org/ns/toaster"
    shard-strategy = "module"
  }
]

Example of modules.conf

module-shards.conf

This file describes which shards live on which members (nodes of a cluster) and the cluster members on which replicas of those shards exist. Which replica is primary depends on the order of the replica list.

The format for module-shards.conf is as follows,

 {
    name = "<friendly_name_of_the_module>"
    shards = [
        {
            name="<any_name_that_is_unique_for_the_module>"
            replicas = [
                "<name_of_member_on_which_to_run>"
            ]
     ]
 }

Example of module-shards.conf

module-shards = [
    {
        name = "default"
        shards = [
            {
                name="default"
                replicas = [
                    "member-1"
                    "member-2"
                    "member-3"
                    
                ]
            }
        ]
    },
    {
        name = "topology"
        shards = [
            {
                name="topology"
                replicas = [
                    "member-3"
                    "member-2"
                    "member-1"
                ]
            }
        ]
    },
    {
        name = "inventory"
        shards = [
            {
                name="inventory"
                replicas = [
                    "member-2"
                    "member-1"
                    "member-3"
                ]
            }
        ]
    },
         {
             name = "toaster"
             shards = [
                 {
                     name="toaster"
                     replicas = [
                         "member-1"
                         "member-2"
                         "member-3" 
                     ]
                 }
             ]
         }

]

The replicas section is a collection of cluster member (node) names. This information is used to decide which on members (nodes), the replicas of a particular shard will be located. Since replication was integrated with the distributed data store this section can have multiple entries.

module-sharding-strategies.conf

During clustering POC (proof of concept) the module-sharding-strategies.conf file defined each module, and the strategy that needs to be used for that module. In Helium this file is no longer used.

module-sharding-strategies = [
    {
        module-name : "inventory"
        strategy : "module"
    },
    {
        module-name : "topology"
        strategy : "module"
    }

]

As Akka is to be used, special attention must be paid to the ""role-name"". The role-name that one uses must correspond to the role-name specified for this node in the akka-cluster configuration. This is a potential area for mistakes as two separate configuration files need to be kept in sync (need to think of a clean solution for this).

Discovery

ClusteringService will be responsible for Discovery and all related functions. It depends on akka-clustering to identify the members of the cluster.

When the ClusteringService comes up, it first checks for the state of the cluster. It looks up all the members in the cluster and verifies that all the roles defined in the cluster-configuration are fulfilled by the cluster membership. Once all the members with the required roles are up and running, the Clustering Service notifies its listeners that the controller is open for business.

Sharding (and data access)

The DistributedDataStore creates a ShardManager. The ShardManager looks at the configuration of the cluster, and automatically creates all the local shards. The ShardManager also provides a mechanism to locate the shard to which a message needs to be sent.

Data is sharded at a sub-tree level. In other words, If a Tree Node belongs to a shard, all the sub-tree nodes will also belong to the same shard. Sharding strategy will be pluggable at a module level. Default sharding strategy will be to allocate a shard per top level module. It is expected that few modules such as Inventory and Topology will be the heavy hitters on the data store, and data may need to be further sharded at a sub-module level. In those cases, Sharding strategy can specify the path prefix to shard on. When data needs to be read or written to a shard, the ShardManager will pass the module name of the data and the instance identifier of the data to a ShardStrategy which will then locate the shard on which the data belongs.

ShardManagement.png

Creating a new transaction

Create a new transaction.png

In the ""current option"", when a consumer tries to create a new transaction on the DistributedDataStore, we have to create a transaction on some remote Shard(s). Why do we need to create a transaction on multiple Shards? It is so because if the transaction is created using the current DOMStore API at the outset, we are not told which "module" we want to do the transaction on.

There are a few ways in which we could fix this:

  1. When creating the transaction, pass the instance identifier of the object on which you want to do the transaction. This is a simple option because it introduces a more restrictive API that forces the consumers to decide the Shard on which they would like to operate.
  2. We do not create remote transactions up-front. When a CRUD operation is done on the TransactionProxy, the TransactionProxy could first create a transaction on the remote Shard, and then only do that operation. Once the transaction is created, it is allowed to live till it is committed. This is also workable, and the overall behavior may not be worse than the current option.

Read/Write on a transaction

Read on a transaction.png


Write on a transaction.png

Readying a transaction for commit

Readying a transaction for commit.png

Committing a transaction

The plan is to use 3-phase commit semantics for committing transactions. The 3-phase commit protocol works as shown in the following diagram. This would imply that we do guarantee distributed transactions but in-reality we may not be able to. If a 3-phase commit is not important or possible, it should probably not be included on the DOMStore interfaces.

If a 3-phase commit is not to be supported, it is possible to simply have a commit on the transaction.

Three-phase commit diagram.png

The coordinator in our case would be the ThreePhaseCommitCohortProxy (shown as ThreePhaseCommitProxy in the following diagram) and the actual cohorts will be the ShardCommitCohorts.

Committing a transaction.png

Replication

To be implemented using the RAFT consensus algorithm https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf

Aggregation

If data from a single module is put into multiple shards, it is possible that a read may require data from multiple shards to be retrieved aggregated and returned to the consumer.

<TBD>

Querying

With the current DOM-Store being the only type read supported, it reads a piece of data based on its identifier. Querying for data based on attributes of the data is not supported. This capability needs to be added to the DOM-Store interfaces and optimized (indexed) for fast access.

The following types of queries will be supported:

  • Query based on instance identifier
  • Query based on object class (Extend YANG with the notion of an Object Class?)
  • Query based on attribute filters
  • Scoped query based on Object Class
  • Scoped query on attribute filters

The Data Store will implement B-Tree Indexes to support efficient queries based on attribute filters. YANG data modelers are in the best position to define the expected query patterns, and therefore define the indexes which need to be created. YANG language extensions will be provided to simplify the definition of query indexes.

Data Change Notifications

Registration

Data change notifications can be thought of as continuous queries where the query is specified once and instead of returning the matching data immediately, the system sends notifications when matching data appear in the data tree. As such, data change subscriptions can be customized just like the data queries.

The following types of subscriptions will be supported:

  • Query based on instance identifier
  • Query based on object class (Extend YANG with the notion of an Object Class?)
  • Query based on attribute filters
  • Scoped query based on Object Class
  • Scoped query on attribute filters

Question: Do we need to support the notion of a custom filter where a piece of Java code can be supplied by the consumer which will be invoked for every matching node during notification and only those nodes which pass the filter will be returned.

Register a DataChangeListener.png

Notification

DataChange notification.png

Shard Primary Replica Election

As per the RAFT consensus algorithm https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf

Data Validation

As data is added into the data store, application developers may need to write a validator to verify whether the data being written is correct. While Validators and DataCommitHandlers are both invoked during a commit, there is a difference between the two. The DataCommitHandler is a broker concept whereas the Validator is a DataStore concept. Validators would be associated with a single shard, and remote registrations of Validators would not be allowed.

<More TBD>

Recovery

To enable recovery, the persistence module of Akka could be used. One caveat to using the persistence module of Akka is that it is experimental. Only a POC will determine if it is usable for our purposes. If it is not usable, there may be a need to roll our own, but, in general, the principles would be the same.

The following are requirements to enable proper recovery:

  1. Write a journal where each successful transaction on a given shard is logged.
  2. From time to time, write the state of the datastore as a snapshot. The idea of using a snapshot is to enable faster recovery.
  3. When the controller is restarted first, re-construct the state of the local shard from the saved snapshot. Then, play the transactions in the transaction journal onto the datastore. When both are complete, the Shard is ready for business.

The above can be provided by Akka.

In addition, we will also need to ensure that the replica being recovered is in sync with the primary replica. One way to ensure this would be for the primary replica to send all "undelivered" messages from its transaction log to the current replica.

Availability

High availability will be enabled by the following:

  1. Replicating shard data to a configurable number of replicas.
  2. Detecting failure of nodes and switching the primary replica.
  3. Shard priority order will be fixed for one of the secondary replicas to become leader.
Replication

After a successful local commit on the primary replica, a replication message would be sent to all the secondary replicas. The secondary replicas would write this message into the journal and then commit the message as a transaction on the InMemoryDataStore. A Transaction is not considered to be complete for external purposes until the data is written to the replication journal on at least one replica.

Node Failure detection

The ClusteringService would be monitoring the state of all the members of a cluster. When it detects node failure, it will notify its listeners of the failure of that node. One of its listeners would be the DistributedDataStore, which on receipt of the failure will send a message to all its TransactionProxys informing of the failure of that node. The failure would be then propagated forward by the TransactionProxys to the ThreePhaseCommitCohortProxy. If the node failure affects the transaction, that is to say, if the failure is on a node where one of the transactions shard resides, then the transaction will be marked as failed, and any further action on it will throw an exception. The same applies on the ThreePhaseCommitCohortProxy.

Monitoring

An Akka cluster can be monitored using a variety of commercial monitoring software such as AppDynamics or NewRelic. TypeSafe used to have a product called TypeSafe Console, which has been discontinued.

Scenario

    - Single node cluster
    - Primary replica for all shards local
    - No secondary replicas


- Replication :off


Scenario

    - Two node cluster
    - Both nodes running

- Replication :on

Scenario

    - Two node cluster
    - Node 1 running
    - Node 2 running -> down

- Node 1 : primary - Cluster operations : suspended

Scenario

    - Two node cluster
    - Node 1 running
    - Node 2 down -> running
    

- Node 1 : primary - Node 1 replicates all data to Node 2 - Cluster operations : resumed

Scenario

    - Two node cluster
    - Node 1 running -> down
    - Node 2 running


- Node 2 : primary - Cluster operations : suspended

Scenario

    - Two node cluster
    - Node 1 down -> running
    - Node 2 running


- Node 2 : primary - Cluster operations : resumed

Scenario

    - Two node cluster
    - Node 2 comes up first
    - Node 1 comes up second


- Node 2 : primary


Scenario

    - Three node cluster
    - Node 1 comes up first 
    - Node 2 comes up second
    - Node 3 comes up third

- Node 1 : primary

Scenario

    - Three node cluster
    - Node 1 running
    - Node 2 running -> down
    - Node 3 running

- Node 1 : primary - Node 1 : starts storing messages destined for Node 2 - Node 3 : saves replicated messages for Node 2 - Node 3 : fully replicated


Scenario

    - Three node cluster
    - Node 1 running
    - Node 2 down -> running
    - Node 3 running

- Node 1 : primary - Node 1 : replicates stored messages to Node 2 - Node 3 : discards the stored messages for Node 2 - Node 3 : fully replicated

Scenario

    - Three node cluster
    - Node 1 running -> down
    - Node 2 down -> running
    - Node 3 running


- Node 3 : primary - Node 3 : replicates stored messages to Node 2


Scenario

    - Three Node cluster
    - Node 1 down -> running
    - Node 2 running
    - Node 3 running

- Node 3 : primary - Node 1 : discards any stored messages for Node 2

Scenario

    - Three Node cluster
    - Node 1 running -> down
    - Node 2 running -> down
    - Node 3 running

Node 3 : primary Cluster operations : suspended


Scenario

    - Three Node cluster
    - Node 1 running -> down
    - Node 2 running -> down (with unreplicated messages)
    - Node 3 running -> down (with unreplicated messages)
    - Node 1 down -> running

Node 1 : primary Cluster operations : suspended

Scenario

    - Three Node Cluster
    - Node 1 running
    - Node 2 down -> running (is more uptodate)
    - Node 3 down

Node 2 : primary

Performance Measurement/Tuning

Concurrency

Akka has the concept of a dispatcher which is essentially a means for Akka to process messages for an actor. What kind of thread model we want to use with our actors? Akka offers a few configurable choices.

Serialization

Serializing objects over the wire is an expensive operation. We need to figure out which type of serialization works best for us.

Remote RPC

When there is a cluster of controllers, there can be cases where one member of the cluster is asked to execute an RPC call on a device which is controlled by another member. The remote RPC broker would route such requests to another member which controls the device. In general, any RPC request for which a provider is not found locally, is routed to another member in the cluster who has a provider for it.

There are two components:

  1. Route Registry : Maintains the list of registered RPCs per member of the cluster.
  2. Remote RPC Broker : Routes RPC calls to the cluster member where RPC is registered and handles response. It also acts as a listener for incoming RPC requests from another cluster member.



Route Registry

This is a container for registered RPCs per cluster member (controller).

var registry: mutable.Map[Address, Bucket] = mutable.Map.empty[Address, Bucket]
final case class Bucket( version: Long, rpcs: List[String] )

Address: Cluster member address. It is really the Akka address of the remote actor system where the Route Registry actor is managed.
Bucket: It is a container for the list of registered RPCs with a version. Version is a timestamp.

The registry is replicated across all members of the cluster. Replication is done using the Gossip protocol and follows the eventual consistency model.

Registry is front-ended with an Actor that talks to a corresponding Actor on another member. This Actor can handle three kinds of messages:

  1. GossipTick - Sent by the local scheduler that triggers the actor to send registry Status to a randomly selected remote member.
  2. Status - This message contains the member Addresses and versions of their corresponding Buckets that sender has.
  3. Delta - This message contains the delta between sender member and the registry of the local member. The local registry can be updated based on this delta.



A member can update *only its own* bucket as and when RPCs are registered/unregistered on it.

[TODO] Add call flow

Remote RPC Broker

The main functions of this component are:

  1. Routing rpc requests to the "right" members and collect responses
  2. Acting as a listener for RPC requests coming in from remote members, delivering it to MD-SAL Broker, and responding with the result



The component can be broken down to two.

Open Questions/Random Thoughts

Why can we not use an existing Distributed Data Store instead of rolling our own?

  1. Most distributed DBs do not support transactions - not even transactions on a single shard. We do intend to support transactions on a given shard.
  2. Not sure if the current existing DBs could even perform well - they certainly cannot perform as well as our in-memory data store
  3. External DBs generally do not do data change notifications
  4. If we used an external DB that would make deployment a little more complicated. We would have to set up ODL and also the external DB. Some people like the current deployment simplicity of ODL.
  5. One of the principles that we want to follow is to discourage data reads and promote data delivery (via change notifications) in this model. The advantage of fast reads that a high performing external DB such as Mongo would become irrelevant.


Notes regarding Sharding design

The design of sharding should be done carefully, based on the queries applications make and noting down that it will be painful(migration involved), if we want to change the sharding logic later after release.

Clustering Scenarios

The scenarios below determine what the clustering implementation will do, given a certain cluster state. The following assumptions have been made:

  1. All the local shard replicas of the first node to come up become primary replicas.
  2. A primary replica will stay the primary replica unless it is deemed as down by the cluster.
  3. A write is considered successful only if it is successfully written to the journal of the primary and the journal of any one secondary replica.
  4. Cluster operations are suspended if the secondary replicas that can be written to is zero.
  5. All replicas of a shard negotiate with each other as to which replica should be the primary.

Scenario 1

    - Single node cluster
    - Primary replica for all shards local
    - No secondary replicas


  • Replication : off

Scenario 2

    - Two node cluster
    - Both nodes running
  • Replication : on

Scenario 3 [follows Scenario 2]

    - Two node cluster
    - Node 1 running
    - Node 2 running -> down
  • Node 1 : primary
  • Cluster operations : suspended

Scenario 4 [follows Scenario 3]

    - Two node cluster
    - Node 1 running
    - Node 2 down -> running
    
  • Node 1 : primary
  • Node 1 replicates all data to Node 2
  • Cluster operations : resumed

Scenario 5 [follows Scenario 2]

    - Two node cluster
    - Node 1 running -> down
    - Node 2 running
  • Node 2 : primary
  • Cluster operations : suspended

Scenario 6 [follows Scenario 5]

    - Two node cluster
    - Node 1 down -> running
    - Node 2 running


  • Node 2 : primary
  • Cluster operations : resumed

Scenario 7

    - Two node cluster
    - Node 2 comes up first
    - Node 1 comes up second


  • Node 2 : primary

Scenario 8

    - Three node cluster
    - Node 1 comes up first 
    - Node 2 comes up second
    - Node 3 comes up third
  • Node 1 : primary

Scenario 9 [follows Scenario 8]

    - Three node cluster
    - Node 1 running
    - Node 2 running -> down
    - Node 3 running
  • Node 1 : primary
  • Node 1 : starts storing messages destined for Node 2
  • Node 3 : saves replicated messages for Node 2
  • Node 3 : fully replicated

Scenario 10 [follows Scenario 9]

    - Three node cluster
    - Node 1 running
    - Node 2 down -> running
    - Node 3 running
  • Node 1 : primary
  • Node 1 : replicates stored messages to Node 2
  • Node 3 : discards the stored messages for Node 2
  • Node 3 : fully replicated

Scenario 11 [follows Scenario 9]

    - Three node cluster
    - Node 1 running -> down
    - Node 2 down -> running
    - Node 3 running


  • Node 3 : primary
  • Node 3 : replicates stored messages to Node 2

Scenario 12 [follows Scenario 11]

   - Three Node cluster
   - Node 1 down -> running
   - Node 2 running
   - Node 3 running
  • Node 3 : primary
  • Node 1 : discards any stored messages for Node 2

Scenario 13 [follows Scenario 8]

   - Three Node cluster
   - Node 1 running -> down
   - Node 2 running -> down
   - Node 3 running
  • Node 3 : primary
  • Cluster operations : suspended

Scenario 14 [follows Scenario 8]

   - Three Node cluster
   - Node 1 running -> down
   - Send Persistent messages to a shard
   - Node 2 running -> down (with unreplicated messages)
   - Node 3 running -> down (with unreplicated messages)
   - Node 1 down -> running
  • Node 1 : primary
  • Cluster operations : suspended

Scenario 15 [follows Scenario 14]

   - Three Node Cluster
   - Node 1 running
   - Node 2 down -> running (is more uptodate)
   - Node 3 down
  • Node 2 : primary

Proof of Concept

Goals

  • Figure out if Akka can be leveraged for clustering [Done]
  • Validate design concepts [Done]
  • Make design choices [Done]
  • Estimate performance characteristics [Done]

Focus Areas

  • Data Distribution / Sharding
    • Determine location of Shard [Done]
    • Akka Clustering [Done]
    • Akka Remoting [Done]
    • Akka Sharding
    • Aggregation (Scatter Gather)
  • Persistence / Recovery
    • Akka Persistence [Done]
  • Replication / High Availability [Done]
  • Querying / Indexing
  • Serviceability (Monitoring and Diagnosis)
    • Akka atmos
  • Data Change Notification (Query like Filters)
  • Serialization over the wire
    • Google Protocol Buffers
    • EXI
    • BSON
  • Data Validators (nothing to do with DataCommitHandlers)
  • Fault Tolerance
    • Akka Supervision [Done]
  • Remote Rpc
    • Remote Rpc Registry and update using Gossip [Done]
  • Expose an actor using OSGi [Done]

References

DOM Data Store

Trello

Status Update

Date Description
07/01/2014

Completed

  • Move InMemoryDataStore into its own bundle
  • Implemented a Distributed DataStore which wraps the in-memory data store using akka actors. Only a single Shard is used.

In Progress

  • Code reviews and merging of commits
  • Further testing (mininet + cbench)
  • Serialization of Normalized Node and conversion of all messages to protocol buffers serialization (target Jul 8th)
  • Modify the Distributed DataStore to use multiple shards instead of the single shard (target Jul 8th)
  • Implementation of Remote Rpc provider (target Jul 9th)
  • Build/Borrow an implementation of RAFT for Replication (target Jul 25th)

Help Needed

  • Build a monitoring solution with Dashboard

Trying out the Distributed Data Store

  • Build an Openflowplugin distribution
  • Pull changes up to this gerrit https://git.opendaylight.org/gerrit/#/c/8427/
  • Go to opendaylight/md-sal/sal-distributed-datastore mvn clean install
  • cp target/sal-distributed-datastore*.jar to the openflow plugin distribution (should also work on most integration distributions)
  • In the distribution folder, edit config/initial/01-md-sal.xml and follow instructions to use distributed datastore instead of in-memory datastore
  • run the distribution using the command ./run.sh -Dakka.loglevel=debug -Dshard.persistent=false -Xmx4G -Xms2G -XX:NewRatio=5 -XX:+UseG1GC -XX:MaxPermSize=256m
08/04/2014

Completed

  • Multiple shard support
  • Distributing shards across cluster
  • Replication
  • Persistence of shard
  • Remote RPC

Trying out the Distributed Data Store

Please follow the instructions on this page: Running and testing an OpenDaylight Cluster

Ongoing

  • Documenting test cases
  • Stability testing
  • Performance tuning and testing
  • Test automation

Clustering Hackers' Meeting

This information is transcluded from OpenDaylight Controller/Clustering/Meeting, go there to see meeting minutes and upcoming agendas.

Time and Location:
  • 08:00am Pacific, Tuesdays.
  • IRC: #opendaylight-clustering
  • Join and email Dev with topic [clustering].