Rozwiązałem swój problem. Powodem niespójnych liczeń był MongoDefaultPartitioner który otacza MongoSamplePartitioner który wykorzystuje losowe pobieranie próbek. Szczerze mówiąc, jak dla mnie jest to dość dziwny błąd. Osobiście wolałbym zamiast tego mieć powolny, ale spójny program do partycjonowania. Szczegóły opcji partycjonowania można znaleźć w oficjalnych opcjach konfiguracji dokumentacja.
kod:
val df = spark.read
.format("com.mongodb.spark.sql.DefaultSource")
.option("uri", "mongodb://127.0.0.1/enron_mail.messages")
.option("partitioner", "spark.mongodb.input.partitionerOptions.MongoPaginateBySizePartitioner ")
.load()