-
Notifications
You must be signed in to change notification settings - Fork 315
Description
Hello Team,
When I am trying to serialize a Spark Pipeline Model to a Mleap bundle. I am getting the below exception
java.util.NoSuchElementException: key not found: org.apache.spark.ml.PipelineModel
at scala.collection.immutable.Map$Map2.apply(Map.scala:227)
at ml.combust.bundle.BundleRegistry.opForObj(BundleRegistry.scala:102)
at ml.combust.bundle.BundleWriter.$anonfun$save$1(BundleWriter.scala:28)
at scala.Option.getOrElse(Option.scala:189)
at ml.combust.bundle.BundleWriter.save(BundleWriter.scala:28)
at ml.combust.bundle.BundleWriter.$anonfun$save$3(BundleWriter.scala:41)
at resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88)
at scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252)
at scala.util.control.Exception$Catch.apply(Exception.scala:228)
at scala.util.control.Exception$Catch.either(Exception.scala:252)
at resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88)
at resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26)
at resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26)
at resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50)
at resource.DeferredExtractableManagedResource.$anonfun$tried$1(AbstractManagedResource.scala:33)
at scala.util.Try$.apply(Try.scala:213)
at resource.DeferredExtractableManagedResource.tried(AbstractManagedResource.scala:33)
at ml.combust.bundle.BundleWriter.save(BundleWriter.scala:40)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.Try$.apply(Try.scala:213)
at scala.util.Success.flatMap(Try.scala:251)
at scala.util.Try$WithFilter.flatMap(Try.scala:142)
at scala.util.Success.flatMap(Try.scala:251)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
mleap version - 0.23.1 spark - 3.4.0 xgboost - 1.7.6
Follow is the code to serialize the Spark Pipeline Model to a Mleap Bundle.
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.ml.{PipelineModel, Transformer}
// MLeap/Bundle.ML Serialization Libraries
import org.apache.spark.ml.bundle.SparkBundleContext
import ml.combust.mleap.spark.SparkSupport._
import scala.util.{Failure, Success, Try}
import java.net.URI
def saveModelAsMleapBundle(bucketName: String, minioPath: String, bestModel : PipelineModel, transformedDf: DataFrame): Try[Unit] = {
Try {
implicit val sbc: SparkBundleContext = SparkBundleContext().withDataset(transformedDf)
val localPath = "/tmp/modelu.zip"
val bundleURI = new URI(s"jar:file:$localPath")
val result = bestModel .writeBundle.save(bundleURI)(sbc).get
}
}
Following is the code used to create the model
val labelEncoderPipelineStage = new StringIndexer().setInputCols(labelEncoderInputCols).setOutputCols(labelEncoderOutputCols).setHandleInvalid("keep")
val assemblerPipelineStage = new VectorAssembler().setInputCols(allFeatureColumns)
.setOutputCol("features").setHandleInvalid("keep")
def get_param(): mutable.HashMap[String, Any] = {
val params = new mutable.HashMapString, Any
params += "objective" -> "multi:softprob"
params += "num_class" -> 7
params += "tree_method" -> "auto"
params += "num_workers" -> 3,
params += "num_early_stopping_rounds" -> 3,
params += "maximize_evaluation_metrics" -> false,
params += "verbosity" -> 3,
param += "missing" -> 0.0,
params += "eta" -> 0.2
params += "seed" -> 50
return params
}
// Create an XGBoost Classifier
val xgb = new XGBoostClassifier(get_param().toMap)
.setFeaturesCol("features")
.setLabelCol()
val xgbParamGrid = (new ParamGridBuilder()
.addGrid(xgb.missing, Array(0.0))
.addGrid(xgb.maxDepth, Array(16))
.addGrid(xgb.eta, Array(0.2))
.addGrid(xgb.gamma, Array(0))
.addGrid(xgb.subSample, Array(0.6, 0.65, 0.7))
.addGrid(xgb.numRound, Array(0.8, 0.9))
.addGrid(xgb.colsampleBytree, Array(1.0))
.addGrid(xgb.minChildWeight, Array(1.0))
.build())
val labelConverterPipelineStage = new IndexToString().setInputCol(predictioncolumn)
.setOutputCol(prediction_indextostringcolumn).setLabels(labelsArray)
val pipeline = new Pipeline().setStages(Array(labelEncoderPipelineStage, assemblerPipelineStage, xgb,labelConverterPipelineStage ))
val evaluator = new MultilabelClassificationEvaluator()
.setLabelCol(label_column)
.setPredictionCol("prediction")
.setMetricName("accuracy")
// Create the Cross Validation pipeline, using XGBoost as the estimator, the
// Binary Classification evaluator, and xgbParamGrid for hyperparameters
val cv = (new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(xgbParamGrid)
.setNumFolds(3))
// Create the model by fitting the training data
val cvModel = cv.fit(trainDF)
val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel]
model is successfully created but while doing the serializing to mleap bundle, getting the above issue
Following the reference.conf entry in my src/resources/reference.conf
ml.combust.mleap.spark.xgboost.ops = [
"ml.dmlc.xgboost4j.scala.spark.mleap.XGBoostClassificationModelOp",
"ml.dmlc.xgboost4j.scala.spark.mleap.XGBoostRegressionModelOp"
]
ml.combust.mleap.spark.registry.default.ops += "ml.combust.mleap.spark.xgboost.ops"
ml.combust.mleap.spark.registry.builtin-ops = [
"org.apache.spark.ml.bundle.ops.classification.DecisionTreeClassifierOp",
"org.apache.spark.ml.bundle.ops.classification.NaiveBayesClassifierOp",
"org.apache.spark.ml.bundle.ops.classification.MultiLayerPerceptronClassifierOp",
"org.apache.spark.ml.bundle.ops.classification.OneVsRestOp",
"org.apache.spark.ml.bundle.ops.classification.RandomForestClassifierOp",
"org.apache.spark.ml.bundle.ops.clustering.GaussianMixtureOp",
"org.apache.spark.ml.bundle.ops.clustering.KMeansOp",
"org.apache.spark.ml.bundle.ops.clustering.BisectingKMeansOp",
"org.apache.spark.ml.bundle.ops.clustering.LDAModelOp",
"org.apache.spark.ml.bundle.ops.feature.BinarizerOp",
"org.apache.spark.ml.bundle.ops.feature.BucketizerOp",
"org.apache.spark.ml.bundle.ops.feature.ChiSqSelectorOp",
"org.apache.spark.ml.bundle.ops.feature.CountVectorizerOp",
"org.apache.spark.ml.bundle.ops.feature.DCTOp",
"org.apache.spark.ml.bundle.ops.feature.ElementwiseProductOp",
"org.apache.spark.ml.bundle.ops.feature.HashingTermFrequencyOp",
"org.apache.spark.ml.bundle.ops.feature.IDFOp",
"org.apache.spark.ml.bundle.ops.feature.InteractionOp",
"org.apache.spark.ml.bundle.ops.feature.MaxAbsScalerOp",
"org.apache.spark.ml.bundle.ops.feature.MinMaxScalerOp",
"org.apache.spark.ml.bundle.ops.feature.NGramOp",
"org.apache.spark.ml.bundle.ops.feature.NormalizerOp",
"org.apache.spark.ml.bundle.ops.feature.PcaOp",
"org.apache.spark.ml.bundle.ops.feature.PolynomialExpansionOp",
"org.apache.spark.ml.bundle.ops.feature.ReverseStringIndexerOp",
"org.apache.spark.ml.bundle.ops.feature.StandardScalerOp",
"org.apache.spark.ml.bundle.ops.feature.StopWordsRemoverOp",
"org.apache.spark.ml.bundle.ops.feature.TokenizerOp",
"org.apache.spark.ml.bundle.ops.feature.VectorAssemblerOp",
"org.apache.spark.ml.bundle.ops.feature.VectorIndexerOp",
"org.apache.spark.ml.bundle.ops.feature.VectorSlicerOp",
"org.apache.spark.ml.bundle.ops.feature.WordToVectorOp",
"org.apache.spark.ml.bundle.ops.feature.RegexTokenizerOp",
"org.apache.spark.ml.bundle.ops.regression.AFTSurvivalRegressionOp",
"org.apache.spark.ml.bundle.ops.regression.DecisionTreeRegressionOp",
"org.apache.spark.ml.bundle.ops.regression.GBTRegressionOp",
"org.apache.spark.ml.bundle.ops.regression.GeneralizedLinearRegressionOp",
"org.apache.spark.ml.bundle.ops.regression.IsotonicRegressionOp",
"org.apache.spark.ml.bundle.ops.regression.LinearRegressionOp",
"org.apache.spark.ml.bundle.ops.regression.RandomForestRegressionOp",
"org.apache.spark.ml.bundle.ops.recommendation.ALSOp",
"org.apache.spark.ml.bundle.ops.tuning.CrossValidatorOp",
"org.apache.spark.ml.bundle.ops.tuning.TrainValidationSplitOp",
"org.apache.spark.ml.bundle.ops.feature.MinHashLSHOp",
"org.apache.spark.ml.bundle.ops.feature.BucketedRandomProjectionLSHOp",
"org.apache.spark.ml.bundle.ops.classification.GBTClassifierOp",
"org.apache.spark.ml.bundle.ops.classification.LogisticRegressionOp",
"org.apache.spark.ml.classification.bundle.ops.LinearSVCOp",
"org.apache.spark.ml.bundle.ops.feature.FeatureHasherOp",
"org.apache.spark.ml.bundle.ops.feature.StringIndexerOp",
"org.apache.spark.ml.bundle.ops.feature.OneHotEncoderOp",
"org.apache.spark.ml.bundle.ops.PipelineOp"
]
ml.combust.mleap.spark.registry.default.ops += "ml.combust.mleap.spark.registry.builtin-ops"
ml.combust.mleap.spark.extension.ops = [
"org.apache.spark.ml.bundle.extension.ops.classification.OneVsRestOp",
"org.apache.spark.ml.bundle.extension.ops.classification.SupportVectorMachineOp",
"org.apache.spark.ml.bundle.extension.ops.feature.ImputerOp",
"org.apache.spark.ml.bundle.extension.ops.feature.MathBinaryOp",
"org.apache.spark.ml.bundle.extension.ops.feature.MathUnaryOp",
"org.apache.spark.ml.bundle.extension.ops.feature.MultinomialLabelerOp",
"org.apache.spark.ml.bundle.extension.ops.feature.WordLengthFilterOp",
"org.apache.spark.ml.bundle.extension.ops.feature.StringMapOp"
]
ml.combust.mleap.spark.registry.default.ops += "ml.combust.mleap.spark.extension.ops"
ml.combust.mleap.registry.builtin-ops = [
"ml.combust.mleap.bundle.ops.classification.DecisionTreeClassifierOp",
"ml.combust.mleap.bundle.ops.classification.GBTClassifierOp",
"ml.combust.mleap.bundle.ops.classification.LogisticRegressionOp",
"ml.combust.mleap.bundle.ops.classification.NaiveBayesClassifierOp",
"ml.combust.mleap.bundle.ops.classification.MultiLayerPerceptronClassifierOp",
"ml.combust.mleap.bundle.ops.classification.OneVsRestOp",
"ml.combust.mleap.bundle.ops.classification.RandomForestClassifierOp",
"ml.combust.mleap.bundle.ops.classification.SupportVectorMachineOp",
"ml.combust.mleap.bundle.ops.classification.LinearSVCOp",
"ml.combust.mleap.bundle.ops.clustering.GaussianMixtureOp",
"ml.combust.mleap.bundle.ops.clustering.KMeansOp",
"ml.combust.mleap.bundle.ops.clustering.BisectingKMeansOp",
"ml.combust.mleap.bundle.ops.clustering.LDAModelOp",
"ml.combust.mleap.bundle.ops.feature.BinarizerOp",
"ml.combust.mleap.bundle.ops.sklearn.BinarizerOp",
"ml.combust.mleap.bundle.ops.feature.BucketedRandomProjectionLSHOp",
"ml.combust.mleap.bundle.ops.feature.BucketizerOp",
"ml.combust.mleap.bundle.ops.feature.ChiSqSelectorOp",
"ml.combust.mleap.bundle.ops.feature.CoalesceOp",
"ml.combust.mleap.bundle.ops.feature.CountVectorizerOp",
"ml.combust.mleap.bundle.ops.feature.DCTOp",
"ml.combust.mleap.bundle.ops.feature.ElementwiseProductOp",
"ml.combust.mleap.bundle.ops.feature.FeatureHasherOp",
"ml.combust.mleap.bundle.ops.feature.HashingTermFrequencyOp",
"ml.combust.mleap.bundle.ops.feature.IDFOp",
"ml.combust.mleap.bundle.ops.feature.ImputerOp",
"ml.combust.mleap.bundle.ops.feature.InteractionOp",
"ml.combust.mleap.bundle.ops.feature.MapEntrySelectorOp",
"ml.combust.mleap.bundle.ops.feature.MathBinaryOp",
"ml.combust.mleap.bundle.ops.feature.MathUnaryOp",
"ml.combust.mleap.bundle.ops.feature.MaxAbsScalerOp",
"ml.combust.mleap.bundle.ops.feature.MinHashLSHOp",
"ml.combust.mleap.bundle.ops.feature.MinMaxScalerOp",
"ml.combust.mleap.bundle.ops.feature.MultinomialLabelerOp",
"ml.combust.mleap.bundle.ops.feature.NGramOp",
"ml.combust.mleap.bundle.ops.feature.NormalizerOp",
"ml.combust.mleap.bundle.ops.feature.OneHotEncoderOp",
"ml.combust.mleap.bundle.ops.feature.PcaOp",
"ml.combust.mleap.bundle.ops.feature.PolynomialExpansionOp",
"ml.combust.mleap.bundle.ops.sklearn.PolynomialFeaturesOp",
"ml.combust.mleap.bundle.ops.feature.ReverseStringIndexerOp",
"ml.combust.mleap.bundle.ops.feature.StandardScalerOp",
"ml.combust.mleap.bundle.ops.feature.StopWordsRemoverOp",
"ml.combust.mleap.bundle.ops.feature.StringIndexerOp",
"ml.combust.mleap.bundle.ops.feature.StringMapOp",
"ml.combust.mleap.bundle.ops.feature.TokenizerOp",
"ml.combust.mleap.bundle.ops.feature.VectorAssemblerOp",
"ml.combust.mleap.bundle.ops.feature.VectorIndexerOp",
"ml.combust.mleap.bundle.ops.feature.VectorSlicerOp",
"ml.combust.mleap.bundle.ops.feature.WordToVectorOp",
"ml.combust.mleap.bundle.ops.feature.RegexTokenizerOp",
"ml.combust.mleap.bundle.ops.feature.RegexIndexerOp",
"ml.combust.mleap.bundle.ops.feature.WordLengthFilterOp",
"ml.combust.mleap.bundle.ops.regression.AFTSurvivalRegressionOp",
"ml.combust.mleap.bundle.ops.regression.DecisionTreeRegressionOp",
"ml.combust.mleap.bundle.ops.regression.GBTRegressionOp",
"ml.combust.mleap.bundle.ops.regression.GeneralizedLinearRegressionOp",
"ml.combust.mleap.bundle.ops.regression.IsotonicRegressionOp",
"ml.combust.mleap.bundle.ops.regression.LinearRegressionOp",
"ml.combust.mleap.bundle.ops.regression.RandomForestRegressionOp",
"ml.combust.mleap.bundle.ops.ensemble.CategoricalDrilldownOp",
"ml.combust.mleap.bundle.ops.recommendation.ALSOp",
"ml.combust.mleap.bundle.ops.PipelineOp"
]
ml.combust.mleap.registry.default.ops += "ml.combust.mleap.registry.builtin-ops"
ml.combust.mleap.xgboost.ops = [
"ml.combust.mleap.xgboost.runtime.bundle.ops.XGBoostClassificationOp",
"ml.combust.mleap.xgboost.runtime.bundle.ops.XGBoostRegressionOp"
]
ml.combust.mleap.registry.default.ops += "ml.combust.mleap.xgboost.ops"
I have added the below mleap dependencies in my gradle
implementation group: 'ml.combust.mleap', name : "mleap-xgboost-spark_${scalaVersion}", version: '0.23.1'
implementation group: 'ml.combust.mleap', name: "mleap-xgboost-runtime_${scalaVersion}", version: '0.23.1'
implementation group: 'ml.combust.mleap', name : "mleap-spark_${scalaVersion}", version: '0.23.1'
implementation group: 'ml.combust.mleap', name: "mleap-spark-extension_${scalaVersion}", version: '0.23.1'
implementation group: 'ml.combust.mleap', name: "mleap-runtime_${scalaVersion}", version: '0.23.1'
Please let me know if you I need to add any additional config to solve this error.