Skip to content

Commit 3f02f54

Browse files
committed
Modified stream converters to mostly defer to StepConverters.
1 parent edf0644 commit 3f02f54

File tree

3 files changed

+56
-119
lines changed

3 files changed

+56
-119
lines changed

src/main/scala/scala/compat/java8/StreamConverters.scala

Lines changed: 52 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,24 @@ trait PrimitiveStreamUnboxer[A, S] {
1515
}
1616

1717
trait Priority6StreamConverters {
18-
implicit class EnrichAnyScalaCollectionWithStream[A](t: TraversableOnce[A]) {
18+
// Note--conversion is only to make sure implicit conversion priority is lower than alternatives.
19+
implicit class EnrichAnyScalaCollectionWithStream[A, CC](cc: CC)(implicit ev: CC <:< TraversableOnce[A]) {
1920
private def mkAcc() = {
2021
val acc = new Accumulator[A]
21-
t.foreach{ acc += _ }
22+
ev(cc).foreach{ acc += _ }
2223
acc
2324
}
2425

2526
def seqStream: Stream[A] = mkAcc().seqStream
2627

2728
def parStream: Stream[A] = mkAcc().parStream
28-
}
29+
}
2930
}
3031

3132
trait Priority5StreamConverters extends Priority6StreamConverters {
32-
implicit class EnrichScalaCollectionWithStream[A <: AnyRef](t: TraversableOnce[A]) {
33+
implicit class EnrichScalaCollectionWithStream[A <: AnyRef, CC](cc: CC)(implicit ev: CC <:< TraversableOnce[A]) {
3334
private def mkArr()(implicit tag: reflect.ClassTag[A]): Array[A] = {
35+
val t = ev(cc)
3436
if (t.isTraversableAgain && t.hasDefiniteSize) {
3537
val sz = t.size
3638
val a = new Array[A](sz)
@@ -49,62 +51,62 @@ trait Priority5StreamConverters extends Priority6StreamConverters {
4951

5052
trait Priority4StreamConverters extends Priority5StreamConverters {
5153
implicit class EnrichAnySteppableWithStream[A, CC](cc: CC)(implicit steppize: CC => MakesAnyStepper[A]) {
52-
def seqStream: Stream[A] = java.util.stream.StreamSupport.stream(steppize(cc).stepper, false)
53-
def parStream: Stream[A] = java.util.stream.StreamSupport.stream(steppize(cc).stepper, true)
54+
def seqStream: Stream[A] = StreamSupport.stream(steppize(cc).stepper, false)
55+
def parStream: Stream[A] = StreamSupport.stream(steppize(cc).stepper.anticipateParallelism, true)
56+
}
57+
implicit class EnrichAnyKeySteppableWithStream[K, CC](cc: CC)(implicit steppize: CC => MakesAnyKeyStepper[K]) {
58+
def seqKeyStream: Stream[K] = StreamSupport.stream(steppize(cc).keyStepper, false)
59+
def parKeyStream: Stream[K] = StreamSupport.stream(steppize(cc).keyStepper.anticipateParallelism, true)
60+
}
61+
implicit class EnrichAnyValueSteppableWithStream[V, CC](cc: CC)(implicit steppize: CC => MakesAnyValueStepper[V]) {
62+
def seqValueStream: Stream[V] = StreamSupport.stream(steppize(cc).valueStepper, false)
63+
def parValueStream: Stream[V] = StreamSupport.stream(steppize(cc).valueStepper.anticipateParallelism, true)
5464
}
5565
}
5666

5767
trait Priority3StreamConverters extends Priority4StreamConverters {
58-
59-
}
60-
61-
trait Priority2StreamConverters extends Priority3StreamConverters {
62-
implicit class EnrichMissingPrimitiveArrayWithStream[A](a: Array[A]) {
63-
private def mkAcc() = {
64-
val acc = new Accumulator[A]
65-
var i = 0
66-
while (i < a.length) {
67-
acc += a(i)
68-
i += 1
69-
}
70-
acc
71-
}
72-
73-
def seqStream: Stream[A] = mkAcc().seqStream
74-
75-
def parStream: Stream[A] = mkAcc().parStream
68+
implicit class EnrichDoubleSteppableWithStream[CC](cc: CC)(implicit steppize: CC => MakesDoubleStepper) {
69+
def seqStream: DoubleStream = StreamSupport.doubleStream(steppize(cc).stepper, false)
70+
def parStream: DoubleStream = StreamSupport.doubleStream(steppize(cc).stepper.anticipateParallelism, true)
7671
}
77-
}
78-
79-
trait Priority1StreamConverters extends Priority2StreamConverters {
80-
implicit class EnrichGenericArrayWithStream[A <: AnyRef](a: Array[A]) {
81-
def seqStream: Stream[A] = java.util.Arrays.stream(a)
82-
def parStream: Stream[A] = seqStream.parallel
72+
implicit class EnrichDoubleKeySteppableWithStream[CC](cc: CC)(implicit steppize: CC => MakesDoubleKeyStepper) {
73+
def seqKeyStream: DoubleStream = StreamSupport.doubleStream(steppize(cc).keyStepper, false)
74+
def parKeyStream: DoubleStream = StreamSupport.doubleStream(steppize(cc).keyStepper.anticipateParallelism, true)
8375
}
84-
85-
implicit class EnrichGenericIndexedSeqWithStream[A](c: collection.IndexedSeqLike[A, _]) {
86-
private def someStream(parallel: Boolean): Stream[A] =
87-
StreamSupport.stream(new converterImpls.StepsAnyIndexedSeq[A](c, 0, c.length), parallel)
88-
def seqStream: Stream[A] = someStream(false)
89-
def parStream: Stream[A] = someStream(true)
76+
implicit class EnrichDoubleValueSteppableWithStream[CC](cc: CC)(implicit steppize: CC => MakesDoubleValueStepper) {
77+
def seqValueStream: DoubleStream = StreamSupport.doubleStream(steppize(cc).valueStepper, false)
78+
def parValueStream: DoubleStream = StreamSupport.doubleStream(steppize(cc).valueStepper.anticipateParallelism, true)
9079
}
91-
92-
implicit class EnrichAnyVectorWithStream[A](c: Vector[A]) {
93-
private def someStream(parallel: Boolean): Stream[A] =
94-
StreamSupport.stream(new converterImpls.StepsAnyVector[A](c, 0, c.length), parallel)
95-
def seqStream: Stream[A] = someStream(false)
96-
def parStream: Stream[A] = someStream(true)
80+
implicit class EnrichIntSteppableWithStream[CC](cc: CC)(implicit steppize: CC => MakesIntStepper) {
81+
def seqStream: IntStream = StreamSupport.intStream(steppize(cc).stepper, false)
82+
def parStream: IntStream = StreamSupport.intStream(steppize(cc).stepper.anticipateParallelism, true)
9783
}
98-
99-
implicit class EnrichGenericFlatHashTableWithStream[A](fht: collection.mutable.FlatHashTable[A]) {
100-
private def someStream(parallel: Boolean): Stream[A] = {
101-
val tbl = runtime.CollectionInternals.getTable(fht)
102-
StreamSupport.stream(new converterImpls.StepsAnyFlatHashTable[A](tbl, 0, tbl.length), parallel)
103-
}
104-
def seqStream: Stream[A] = someStream(false)
105-
def parStream: Stream[A] = someStream(true)
84+
implicit class EnrichIntKeySteppableWithStream[CC](cc: CC)(implicit steppize: CC => MakesIntKeyStepper) {
85+
def seqKeyStream: IntStream = StreamSupport.intStream(steppize(cc).keyStepper, false)
86+
def parKeyStream: IntStream = StreamSupport.intStream(steppize(cc).keyStepper.anticipateParallelism, true)
87+
}
88+
implicit class EnrichIntValueSteppableWithStream[CC](cc: CC)(implicit steppize: CC => MakesIntValueStepper) {
89+
def seqValueStream: IntStream = StreamSupport.intStream(steppize(cc).valueStepper, false)
90+
def parValueStream: IntStream = StreamSupport.intStream(steppize(cc).valueStepper.anticipateParallelism, true)
91+
}
92+
implicit class EnrichLongSteppableWithStream[CC](cc: CC)(implicit steppize: CC => MakesLongStepper) {
93+
def seqStream: LongStream = StreamSupport.longStream(steppize(cc).stepper, false)
94+
def parStream: LongStream = StreamSupport.longStream(steppize(cc).stepper.anticipateParallelism, true)
95+
}
96+
implicit class EnrichLongKeySteppableWithStream[CC](cc: CC)(implicit steppize: CC => MakesLongKeyStepper) {
97+
def seqKeyStream: LongStream = StreamSupport.longStream(steppize(cc).keyStepper, false)
98+
def parKeyStream: LongStream = StreamSupport.longStream(steppize(cc).keyStepper.anticipateParallelism, true)
10699
}
100+
implicit class EnrichLongValueSteppableWithStream[CC](cc: CC)(implicit steppize: CC => MakesLongValueStepper) {
101+
def seqValueStream: LongStream = StreamSupport.longStream(steppize(cc).valueStepper, false)
102+
def parValueStream: LongStream = StreamSupport.longStream(steppize(cc).valueStepper.anticipateParallelism, true)
103+
}
104+
}
107105

106+
trait Priority2StreamConverters extends Priority3StreamConverters {
107+
}
108+
109+
trait Priority1StreamConverters extends Priority2StreamConverters {
108110
implicit class RichStream[A](stream: Stream[A]) {
109111
def accumulate = stream.collect(Accumulator.supplier[A], Accumulator.adder[A], Accumulator.merger[A])
110112

@@ -161,75 +163,6 @@ trait Priority1StreamConverters extends Priority2StreamConverters {
161163
* ```
162164
*/
163165
object StreamConverters extends Priority1StreamConverters {
164-
implicit class EnrichDoubleIndexedSeqWithStream[CC <: collection.IndexedSeqLike[Double, _]](c: CC) {
165-
private def someStream(parallel: Boolean): DoubleStream =
166-
StreamSupport.doubleStream(new converterImpls.StepsDoubleIndexedSeq[CC](c, 0, c.length), parallel)
167-
def seqStream: DoubleStream = someStream(false)
168-
def parStream: DoubleStream = someStream(true)
169-
}
170-
171-
implicit class EnrichIntIndexedSeqWithStream[CC <: collection.IndexedSeqLike[Int, _]](c: CC) {
172-
private def someStream(parallel: Boolean): IntStream =
173-
StreamSupport.intStream(new converterImpls.StepsIntIndexedSeq[CC](c, 0, c.length), parallel)
174-
def seqStream: IntStream = someStream(false)
175-
def parStream: IntStream = someStream(true)
176-
}
177-
178-
implicit class EnrichLongIndexedSeqWithStream[CC <: collection.IndexedSeqLike[Long, _]](c: CC) {
179-
private def someStream(parallel: Boolean): LongStream =
180-
StreamSupport.longStream(new converterImpls.StepsLongIndexedSeq[CC](c, 0, c.length), parallel)
181-
def seqStream: LongStream = someStream(false)
182-
def parStream: LongStream = someStream(true)
183-
}
184-
185-
implicit class EnrichDoubleVectorWithStream(c: Vector[Double]) {
186-
private def someStream(parallel: Boolean): DoubleStream =
187-
StreamSupport.doubleStream(new converterImpls.StepsDoubleVector(c, 0, c.length), parallel)
188-
def seqStream: DoubleStream = someStream(false)
189-
def parStream: DoubleStream = someStream(true)
190-
}
191-
192-
implicit class EnrichIntVectorWithStream(c: Vector[Int]) {
193-
private def someStream(parallel: Boolean): IntStream =
194-
StreamSupport.intStream(new converterImpls.StepsIntVector(c, 0, c.length), parallel)
195-
def seqStream: IntStream = someStream(false)
196-
def parStream: IntStream = someStream(true)
197-
}
198-
199-
implicit class EnrichLongVectorWithStream(c: Vector[Long]) {
200-
private def someStream(parallel: Boolean): LongStream =
201-
StreamSupport.longStream(new converterImpls.StepsLongVector(c, 0, c.length), parallel)
202-
def seqStream: LongStream = someStream(false)
203-
def parStream: LongStream = someStream(true)
204-
}
205-
206-
implicit class EnrichDoubleFlatHashTableWithStream(fht: collection.mutable.FlatHashTable[Double]) {
207-
private def someStream(parallel: Boolean): DoubleStream = {
208-
val tbl = runtime.CollectionInternals.getTable(fht)
209-
StreamSupport.doubleStream(new converterImpls.StepsDoubleFlatHashTable(tbl, 0, tbl.length), parallel)
210-
}
211-
def seqStream: DoubleStream = someStream(false)
212-
def parStream: DoubleStream = someStream(true)
213-
}
214-
215-
implicit class EnrichIntFlatHashTableWithStream(fht: collection.mutable.FlatHashTable[Int]) {
216-
private def someStream(parallel: Boolean): IntStream = {
217-
val tbl = runtime.CollectionInternals.getTable(fht)
218-
StreamSupport.intStream(new converterImpls.StepsIntFlatHashTable(tbl, 0, tbl.length), parallel)
219-
}
220-
def seqStream: IntStream = someStream(false)
221-
def parStream: IntStream = someStream(true)
222-
}
223-
224-
implicit class EnrichLongFlatHashTableWithStream(fht: collection.mutable.FlatHashTable[Long]) {
225-
private def someStream(parallel: Boolean): LongStream = {
226-
val tbl = runtime.CollectionInternals.getTable(fht)
227-
StreamSupport.longStream(new converterImpls.StepsLongFlatHashTable(tbl, 0, tbl.length), parallel)
228-
}
229-
def seqStream: LongStream = someStream(false)
230-
def parStream: LongStream = someStream(true)
231-
}
232-
233166
implicit class EnrichDoubleArrayWithStream(a: Array[Double]) {
234167
def seqStream: DoubleStream = java.util.Arrays.stream(a)
235168
def parStream: DoubleStream = seqStream.parallel

src/main/scala/scala/compat/java8/collectionImpl/Stepper.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ trait StepperLike[@specialized(Double, Int, Long) A, +CC] { self =>
8585

8686
/** Returns the precise underlying type of this `Stepper`. */
8787
def typedPrecisely: CC
88+
89+
/** Warns this `Stepper` that it is likely to be used in a parallel context (used for efficiency only) */
90+
def anticipateParallelism: this.type = this
8891

8992

9093
////

src/test/scala/scala/compat/java8/StreamConvertersTest.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import org.junit.Assert._
66
class StreamConvertersTest {
77
import java.util.stream._
88
import StreamConverters._
9+
import StepConverters._
910

1011
def assertEq[A](a1: A, a2: A, s: String) { assertEquals(s, a1, a2) } // Weird order normally!
1112
def assertEq[A](a1: A, a2: A) { assertEq(a1, a2, "not equal") }

0 commit comments

Comments
 (0)