I am by no means an expert in Go, indeed quite the opposite. I am currently trying to get familiar with it. I started getting familiar with the syntax, the memory and the concurrency model. As usual for me, I am trying to contrast it with something I already know, like Java.
So I stumbled in this interesting talk in which the great Sajma introduced the Go concurrency model with some examples. The slides for the talk and the examples are here. Not far in the talk, a question popped up: think about what it would take to implement the same thing in other languages like Java.
Is it really that hard? I was not that sure, I mean, Java does not have 'select' statement, neither it has built-in channels, but it should not be difficult to replicate the examples in Java or is it?
So I though I could have some fun implementing the examples in Java.
Go concurrency
Before getting to the example, this is a streamlined recap of the talk (by the way, it's a cool talk, so I really suggest you to watch it).
- Go concurrency
- The concurrency model is based on Communication sequential Processes (Hoare, 1978)
- Concurrent programs are structured as independent processes that execute sequentially and communicate by passing messages.
- "Don't communicate by sharing memory, share memory by communicating"
- Go primitives: go routines, channels and the select statement
- Go routines
- It's a lightweight thread (it's not a thread)
- Channel provide communication between go routines (analougous to synchronized queue in Java)
- Select multiplex communication among go routines
The example
In the examples we have to build an hypothetical client which queries google services (web, image and video services). Ideally, we would like to query those services in parallel and aggregate the answers. All the code for the examples is in github.com/napicella/go-for-java-programmers.
So let's get started.
First example: querying Google search in parallel
This is how the go code looks like:
func Google(query string) (results []Result) {
c := make(chan Result)
go func() { c <- Web(query) }()
go func() { c <- Image(query) }()
go func() { c <- Video(query) }()
for i := 0; i < 3; i++ {
result := <-c
results = append(results, result)
}
return
}
What about Java? My solution involves using CompletableFuture like the following.
public void google(String query) throws ExecutionException, InterruptedException {
CompletableFuture<String>[] futures = new CompletableFuture[] {
supplyAsync(() -> web(query)),
supplyAsync(() -> image(query)),
supplyAsync(() -> video(query))
};
List<String> result = new ArrayList<>();
allOf(futures)
.thenAccept((ignore) -> Arrays.stream(futures)
.map(this::safeGet)
.forEach(result::add)).get();
// Note: calling get is necessary only because I want to print the result
// before returning from the function
System.out.println(result);
}
protected String safeGet(Future<String> future) {
try {
return future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return "";
}
The web, image and video services are just mocks with random sleeps.
So, what's the difference between the java code and go one? The java code is a bit more verbose and the code does not use message passing between threads like in Go, besides that they look really similar.
Let's move to the second example.
Second example: timeout
What if we don't want to wait for slow servers? We can use a timeout!
The idea is to wait until either all the servers replies to our request or the timeout goes off.
func Google(query string) (results []Result) {
c := make(chan Result, 3)
go func() { c <- Web(query) }()
go func() { c <- Image(query) }()
go func() { c <- Video(query) }()
timeout := time.After(80 * time.Millisecond)
for i := 0; i < 3; i++ {
select {
case result := <-c:
results = append(results, result)
case <-timeout:
fmt.Println("timed out")
return
}
}
return
Let's see how that would look like in java:
public void googleWithTimeout(String query) throws ExecutionException, InterruptedException {
// This is the first difference with the go example, the result array must
// be a synchronized list.
// Go channel are completely thread safe, so it's totally okay to funnel
// data from multiple go routines to an array.
List<String> result = Collections.synchronizedList(new ArrayList<>());
// this is not safe since it's possible that all the threads in the thread
// pool (default to ForkJoin) are busy, so the timer won't start
CompletableFuture<Void> timeout = runAsync(() -> timeout(TIMEOUT_MILLIS));
anyOf(
allOf(runAsync(() -> result.add(web(query))),
runAsync(() -> result.add(image(query))),
runAsync(() -> result.add(video(query)))),
timeout
).get();
System.out.println(result);
}
protected Void timeout(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
In the Java example there is a substantial difference to the go one: the tasks share the result array, so for the java code to work, we need a synchronized array. On the other hand, Go channel are completely thread safe, so it's totally okay to funnel data from multiple go routines to an array.
As mentioned in the comment the use of timeout is not completely safe indeed it's possible that all the threads in the thread pool (default to ForkJoin) are busy so the timer won't start. We can obviously run a Thread with a different ExecutorService or just manually create a Thread and run it.
protected CompletableFuture<Void> timeout(int millis) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
final CompletableFuture<Void> timeout = new CompletableFuture<>();
executorService.schedule(() -> {
timeout.complete(null);
}, millis, TimeUnit.MILLISECONDS);
return timeout;
}
Third example : Reduce tail latency using replicated search servers.
In go:
func Google(query string) (results []Result) {
c := make(chan Result, 3)
go func() { c <- First(query, Web1, Web2) }()
go func() { c <- First(query, Image1, Image2) }()
go func() { c <- First(query, Video1, Video2) }()
timeout := time.After(80 * time.Millisecond)
for i := 0; i < 3; i++ {
select {
case result := <-c:
results = append(results, result)
case <-timeout:
fmt.Println("timed out")
return
}
}
return
where the function First is defined as follow:
func First(query string, replicas ...Search) Result {
c := make(chan Result, len(replicas))
searchReplica := func(i int) { c <- replicas[i](query) }
for i := range replicas {
go searchReplica(i)
}
return <-c
}
Let's see in Java
public void googleWithReplicatedServers(String query) throws ExecutionException, InterruptedException {
List<String> result = Collections.synchronizedList(new ArrayList<>());
// Unfortunately this does not work as expected because the inner anyOf
// won't stop the other calls, so the result might end up having
// duplicates, i.e [some-image, some-image, some-video]
anyOf(
allOf(
anyOf(runAsync(() -> result.add(web(query))), runAsync(() -> result.add(webReplica(query)))),
anyOf(runAsync(() -> result.add(image(query))), runAsync(() -> result.add(imageReplica(query)))),
anyOf(runAsync(() -> result.add(video(query))), runAsync(() -> result.add(videoReplica(query))))
),
timeout(TIMEOUT_MILLIS)
).get();
System.out.println(result);
}
Unfortunately the code does not quite work because when one of the future completes will add the result in the array but the execution of the other one will continue causing a duplicate in the result.
Let's correct that:
// replicate servers and use the first response - fixing the problem mentioned
// earlier by using supplyAsync + thenAccept instead of runAsync
public void googleWithReplicatedServers2(String query) throws ExecutionException, InterruptedException {
List<String> result = Collections.synchronizedList(new ArrayList<>());
anyOf(
allOf(
anyOf(supplyAsync(() -> web(query)),
supplyAsync(() -> webReplica(query))).thenAccept((s) -> result.add((String) s)),
anyOf(supplyAsync(() -> image(query)),
supplyAsync(() -> imageReplica(query))).thenAccept((s) -> result.add((String) s)),
anyOf(supplyAsync(() -> video(query)),
supplyAsync(() -> videoReplica(query))).thenAccept((s) -> result.add((String) s))
),
timeout(TIMEOUT_MILLIS)
).get();
System.out.println(result);
}
// same as above, but this time we use the function 'first', which is really
// just a wrapper around CompletableFuture.anyOf
public void googleWithReplicatedServers3(String query) throws ExecutionException, InterruptedException {
List<String> result = Collections.synchronizedList(new ArrayList<>());
anyOf(
allOf(
first(query, Google::web, Google::webReplica).thenAccept((s) -> result.add((String) s)),
first(query, Google::image, Google::imageReplica).thenAccept((s) -> result.add((String) s)),
first(query, Google::video, Google::videoReplica).thenAccept((s) -> result.add((String) s))
),
timeout(TIMEOUT_MILLIS)
).get();
System.out.println(result);
}
Conclusions
Besides the fact I had some fun with CompletableFuture, the clear advantage of Go is really the fact that the concurrency model is built in the language itself which simplifies the communication among different agents.
On the other side, I am not sure why they dropped OOP support, like classes for example. I mean, what's wrong with OOP?
Top comments (8)
Nice comparison!
Having a sane built-in concurrency model is definitely a big plus for a language. That's definitely one of the things where go excels at.
I do like future-based async code, because it tends to be very clean and testable. Java's implementation however, is a big mess with very questionable design choices.
Here's a quick draft in scala, that intention wise is very close to your Java code, but using Scala's Future.sequence and Future.firstCompletedOf, avoiding low level concurrency completely.
If I were forced to implement this in Java, I would implement sequence (using allOf) and firstCompletedOf (using anyOf) instead of using concurrent datastructures.
I think you can write the third function in an even shorter way with
Future.traverse
instead ofFuture.sequence
:I didn't expect this, but I discovered that
Future.traverse
is really handy in lots of situations.Thanks for writing down the Scala way.
I have not written any significant piece of code in Scala, but every time I stumble in something written in it, it looks just so elegant.
You made me think I should learn more about it :)
By the way, your scala code would make a great response post :)
As someone who was introduced to programming using Java and a newfound interest in learning Go, this is an awesome introduction to the basic concepts of the language and has definitely inspired me to investigate further.
Down the rabbit hole I go!
Thank you :)
Just curious but whats more performant?
I also think the lines between concurrency and parallism are blurred here. Yes, the goroutines are allocated a thread of execution but CSP is synchronise and blocking by nature. You can only put something on the channel if the consumer is ready to accept it.
The post was not meant to describe alternative concurrency models, like the actor model you mentioned.
I had some fun writing concurrent code that we can write using the primitives included in java as version 8.
Like you said there are java libs which help to write concurrent code within the boundaries of those models, but those libs are not shipped together with the jdk.
Likewise, I am pretty sure there attempts to port the go concorrency model in java, for example cs.kent.ac.uk/projects/ofa/jcsp/ but again my post was not about that.
Anyway, your comment about reading (more) about akka or kotlin is valid, so I 'll take it as a constructive feedback, thanks!