Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for Jaccard bag/multiset semantics #5

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ The previously described data can easily be converted into Spark's SparseVector

- *minClusterSize* - a post processing filter function that excludes clusters below a threshold.

- *repeatedItems* - optional boolean value to indicate whether there are repeated items in each set and you want to use [multisets](https://en.wikipedia.org/wiki/Multiset) for computing Jaccard similarity. Defaults to false. Note that, when using bag semantics/multisets, the maximum possible similarity value for any two sets is 0.5 rather than 1. See [Mining of Massive Datasets](http://mmds.org/) chapter 3 pgs. 76-77 for more details.

There are two ways to execute LSH. The first being a driver class that is submitted to a spark cluster (can be a single machine running in local mode). The second is using spark's REPL. The later is useful for parameter tuning.


Expand Down
39 changes: 32 additions & 7 deletions src/main/scala/com/invincea/spark/hash/LSH.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import org.apache.spark.rdd.RDD
import scala.collection.mutable.ListBuffer
import org.apache.spark.SparkContext._

class LSH(data : RDD[SparseVector], p : Int, m : Int, numRows : Int, numBands : Int, minClusterSize : Int) extends Serializable {
class LSH(data : RDD[SparseVector], p : Int, m : Int, numRows : Int, numBands : Int, minClusterSize : Int, repeatedItems : Boolean = false)
extends Serializable {

/** run LSH using the constructor parameters */
def run() : LSHModel = {

//create a new model object
val model = new LSHModel(p, m, numRows)
val model = new LSHModel(p, m, numRows, repeatedItems)

//preserve vector index
val zdata = data.zipWithIndex().cache()
Expand All @@ -38,14 +39,16 @@ class LSH(data : RDD[SparseVector], p : Int, m : Int, numRows : Int, numBands :
model.clusters = zdata.map(x => x.swap).join(model.vector_cluster).map(x => (x._2._2, x._2._1)).groupByKey().cache()

//compute the jaccard similarity of each cluster
model.scores = model.clusters.map(row => (row._1, jaccard(row._2.toList))).cache()
model.scores = model.clusters
.map(row => (row._1, if (model.hasRepeatedItems) jaccardBag(row._2.toList) else jaccard(row._2.toList))).cache()

model
}

/** compute a single vector against an existing model */
def compute(data : SparseVector, model : LSHModel, minScore : Double) : RDD[(Long, Iterable[SparseVector])] = {
model.clusters.map(x => (x._1, x._2++List(data))).filter(x => jaccard(x._2.toList) >= minScore)
model.clusters.map(x => (x._1, x._2++List(data)))
.filter(x => (if (model.hasRepeatedItems) jaccardBag(x._2.toList) else jaccard(x._2.toList)) >= minScore)
}

/** compute jaccard between two vectors */
Expand All @@ -59,6 +62,28 @@ class LSH(data : RDD[SparseVector], p : Int, m : Int, numRows : Int, numBands :
def jaccard(l : List[SparseVector]) : Double = {
l.foldLeft(l(0).indices.toList)((a1, b1) => a1.intersect(b1.indices.toList.asInstanceOf[List[Nothing]])).size /
l.foldLeft(List())((a1, b1) => a1.union(b1.indices.toList.asInstanceOf[List[Nothing]])).distinct.size.doubleValue
}

}
}

/** compute jaccard similarity between two vectors using bag/multiset semantics,
* where the indices represent items and the values represent the
* number of times each item is repeated in the set */
def jaccardBag(a : SparseVector, b : SparseVector) : Double = {
val a1 = getItems(a)
val b1 = getItems(b)
a1.intersect(b1).size / a1.union(b1).size.doubleValue
}

/** compute jaccard similarity over a list of vectors using bag/multiset semantics */
def jaccardBag(l : List[SparseVector]) : Double = {
l.foldLeft(getItems(l(0)))((a1, b1) => a1.intersect(getItems(b1).asInstanceOf[List[Nothing]])).size /
l.foldLeft(List())((a1, b1) => a1.union(getItems(b1).asInstanceOf[List[Nothing]])).size.doubleValue
}

/** convert a SparseVector where indices represent items and
* values represent the times each item appears in the set
* to a list */
def getItems(xs : SparseVector) : List[Int] = {
xs.indices.zip(xs.values)
.flatMap(pair => List.fill(pair._2.toInt)(pair._1)).toList
}
}
5 changes: 4 additions & 1 deletion src/main/scala/com/invincea/spark/hash/LSHModel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import scala.collection.mutable.ListBuffer
import org.apache.spark.SparkContext._


class LSHModel(p : Int, m : Int, numRows : Int) extends Serializable {
class LSHModel(p : Int, m : Int, numRows : Int, repeatedItems : Boolean = false) extends Serializable {

/** generate rows hash functions */
private val _hashFunctions = ListBuffer[Hasher]()
Expand All @@ -31,6 +31,9 @@ class LSHModel(p : Int, m : Int, numRows : Int) extends Serializable {

/** jaccard cluster scores */
var scores : RDD[(Long, Double)] = null

/** whether each vector has repeated items */
var hasRepeatedItems : Boolean = repeatedItems

/** filter out scores below threshold. this is an optional step.*/
def filter(score : Double) : LSHModel = {
Expand Down
26 changes: 26 additions & 0 deletions src/test/scala/com/invincea/spark/hash/LSHSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,30 @@ class LSHSuite extends FunSuite with LocalSparkContext {
assert(model.filter(0.85).clusters.count() == 1)

}
test("repeated items test") {


// suppose each key represents a movie and the value is a rating
val ratings = List(List((47,5.0), (33,2.0), (54,1.0), (23,5.0)),
List((47,5.0), (33,2.0), (54,1.0), (23,4.0), (92,1.0)),
List((54,3.0), (123,5.0), (82,1.0), (536,3.0), (92,3.0), (101,2.0)),
List((45,2.0), (46,5.0), (47,4.0), (50,1.0), (54,2.0), (75,5.0), (23,4.0), (82,2.0)))

val rdd = sc.parallelize(ratings)

// make sure we have 4
assert(rdd.count() == 4)

val vctr = rdd.map(a => Vectors.sparse(1000, a).asInstanceOf[SparseVector])

val lsh = new LSH(data = vctr, p = 1019, m = 1000, numRows = 1000, numBands = 100, minClusterSize = 2, repeatedItems = true)
val model = lsh.run

// this should return 1 cluster
assert(model.clusters.count() == 1)

// The cluster similarity should be greater than 0.45 (0.462 exactly)
// The limit on jaccard similarity using bag semantics is 0.5, so this is actually very high
assert(model.filter(0.45).clusters.count() == 1)
}
}