diff --git a/.circleci/config.yml b/.circleci/config.yml index f8a2614dcc3c..53ee149c3b21 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -61,6 +61,21 @@ jobs: - /usr/local/Homebrew - ~/Library/Caches/Homebrew/downloads + test_preview_mt: + machine: true + environment: + <<: *env + TRAVIS_OS_NAME: linux + ARCH: x86_64 + ARCH_CMD: linux64 + steps: + - checkout + - run: bin/ci prepare_system + - run: echo 'export CURRENT_TAG="$CIRCLE_TAG"' >> $BASH_ENV + - run: bin/ci prepare_build + - run: bin/ci with_build_env 'make crystal' + - run: bin/ci with_build_env 'CRYSTAL_WORKERS=4 make std_spec threads=1 verbose=1 FLAGS="-D preview_mt"' + check_format: machine: true environment: @@ -390,6 +405,8 @@ workflows: filters: *unless_maintenance - test_darwin: filters: *unless_maintenance + - test_preview_mt: + filters: *unless_maintenance - check_format: filters: *unless_maintenance - sync_docs_s3: @@ -412,6 +429,8 @@ workflows: filters: *per_tag - test_darwin: filters: *per_tag + - test_preview_mt: + filters: *per_tag - check_format: filters: *per_tag - prepare_common: @@ -481,6 +500,7 @@ workflows: - test_linux - test_linux32 - test_darwin + - test_preview_mt - check_format - prepare_common: requires: @@ -539,6 +559,8 @@ workflows: filters: *maintenance - test_darwin: filters: *maintenance + - test_preview_mt: + filters: *maintenance - check_format: filters: *maintenance - prepare_common: diff --git a/spec/compiler/codegen/class_spec.cr b/spec/compiler/codegen/class_spec.cr index b8504543f1e5..1475d6ab21ee 100644 --- a/spec/compiler/codegen/class_spec.cr +++ b/spec/compiler/codegen/class_spec.cr @@ -447,6 +447,8 @@ describe "Code gen: class" do it "allows using self in class scope" do run(%( + require "prelude" + class Foo def self.foo 1 @@ -713,6 +715,8 @@ describe "Code gen: class" do it "codegens singleton (#718)" do run(%( + require "prelude" + class Singleton @@instance = new diff --git a/spec/compiler/codegen/class_var_spec.cr b/spec/compiler/codegen/class_var_spec.cr index ec35957803dd..0b63d68f6342 100644 --- a/spec/compiler/codegen/class_var_spec.cr +++ b/spec/compiler/codegen/class_var_spec.cr @@ -106,6 +106,8 @@ describe "Codegen: class var" do it "uses var in class var initializer" do run(%( + require "prelude" + class Foo @@var : Int32 @@var = begin @@ -128,6 +130,8 @@ describe "Codegen: class var" do it "reads simple class var before another complex one" do run(%( + require "prelude" + class Foo @@var2 : Int32 @@var2 = @@var &+ 1 @@ -145,6 +149,8 @@ describe "Codegen: class var" do it "initializes class var of union with single type" do run(%( + require "prelude" + class Foo @@var : Int32 | String @@var = 42 @@ -196,6 +202,8 @@ describe "Codegen: class var" do it "initializes dependent constant before class var" do run(%( + require "prelude" + def foo a = 1 b = 2 @@ -233,6 +241,8 @@ describe "Codegen: class var" do it "doesn't use nilable type for initializer" do run(%( + require "prelude" + class Foo @@foo : Int32? @@foo = 42 @@ -250,7 +260,9 @@ describe "Codegen: class var" do end it "codegens class var with begin and vars" do - run(" + run(%( + require "prelude" + class Foo @@foo : Int32 @@foo = begin @@ -265,11 +277,13 @@ describe "Codegen: class var" do end Foo.foo - ").to_i.should eq(3) + )).to_i.should eq(3) end it "codegens class var with type declaration begin and vars" do - run(" + run(%( + require "prelude" + class Foo @@foo : Int32 = begin a = 1 @@ -283,7 +297,7 @@ describe "Codegen: class var" do end Foo.foo - ").to_i.should eq(3) + )).to_i.should eq(3) end it "codegens class var with nilable reference type" do @@ -338,6 +352,8 @@ describe "Codegen: class var" do it "gets pointerof class var complex constant" do run(%( + require "prelude" + z = Foo.foo class Foo diff --git a/spec/compiler/codegen/const_spec.cr b/spec/compiler/codegen/const_spec.cr index 1a6ce9cd2034..34898ba47524 100644 --- a/spec/compiler/codegen/const_spec.cr +++ b/spec/compiler/codegen/const_spec.cr @@ -87,6 +87,8 @@ describe "Codegen: const" do it "declare constants in right order" do run(%( + require "prelude" + CONST1 = 1 + 1 CONST2 = true ? CONST1 : 0 CONST2 @@ -94,7 +96,9 @@ describe "Codegen: const" do end it "uses correct types lookup" do - run(" + run(%( + require "prelude" + module Moo class B def foo @@ -110,11 +114,13 @@ describe "Codegen: const" do end foo - ").to_i.should eq(1) + )).to_i.should eq(1) end it "codegens variable assignment in const" do - run(" + run(%( + require "prelude" + class Foo def initialize(@x : Int32) end @@ -134,11 +140,13 @@ describe "Codegen: const" do end foo - ").to_i.should eq(1) + )).to_i.should eq(1) end it "declaring var" do - run(" + run(%( + require "prelude" + BAR = begin a = 1 while 1 == 2 @@ -153,7 +161,7 @@ describe "Codegen: const" do end Foo.new.compile - ").to_i.should eq(1) + )).to_i.should eq(1) end it "initialize const that might raise an exception" do @@ -171,7 +179,9 @@ describe "Codegen: const" do end it "allows implicit self in constant, called from another class (bug)" do - run(" + run(%( + require "prelude" + module Foo def self.foo 1 @@ -187,11 +197,13 @@ describe "Codegen: const" do end Bar.new.bar - ").to_i.should eq(1) + )).to_i.should eq(1) end it "codegens two consts with same variable name" do - run(" + run(%( + require "prelude" + CONST1 = begin a = 1 end @@ -201,11 +213,13 @@ describe "Codegen: const" do end (CONST1 + CONST2).to_i - ").to_i.should eq(3) + )).to_i.should eq(3) end it "works with variable declared inside if" do run(%( + require "prelude" + FOO = begin if 1 == 2 x = 3 @@ -220,6 +234,8 @@ describe "Codegen: const" do it "codegens constant that refers to another constant that is a struct" do run(%( + require "prelude" + struct Foo X = Foo.new(1) Y = X @@ -281,6 +297,8 @@ describe "Codegen: const" do it "uses const before declaring it (hoisting)" do run(%( + require "prelude" + x = CONST CONST = foo @@ -342,6 +360,8 @@ describe "Codegen: const" do it "gets pointerof constant" do run(%( + require "prelude" + z = pointerof(FOO).value FOO = 10 z @@ -350,6 +370,8 @@ describe "Codegen: const" do it "gets pointerof complex constant" do run(%( + require "prelude" + z = pointerof(FOO).value FOO = begin a = 10 @@ -446,4 +468,42 @@ describe "Codegen: const" do mod.to_s.should_not contain("CONST") end + + it "synchronizes initialization of constants" do + run(%( + require "prelude" + + def foo + v1, v2 = 1, 1 + rand(100000..10000000).times do + v1, v2 = v2, v1 + v2 + end + v2 + end + + ch = Channel(Int32).new + + 10.times do + spawn do + ch.send X + end + end + + X = foo + + def test(ch) + expected = X + + 10.times do + if ch.receive != expected + return false + end + end + + true + end + + test(ch) + )).to_b.should be_true + end end diff --git a/spec/compiler/codegen/debug_spec.cr b/spec/compiler/codegen/debug_spec.cr index a8b883d15409..09c20b5da858 100644 --- a/spec/compiler/codegen/debug_spec.cr +++ b/spec/compiler/codegen/debug_spec.cr @@ -107,6 +107,8 @@ describe "Code gen: debug" do it "has correct debug location after constant initialization in call with block (#4719)" do codegen(%( + require "prelude" + fun __crystal_malloc_atomic(size : UInt32) : Void* x = uninitialized Void* x diff --git a/spec/compiler/codegen/hooks_spec.cr b/spec/compiler/codegen/hooks_spec.cr index eb745bb5d320..c2dfe6a4e568 100644 --- a/spec/compiler/codegen/hooks_spec.cr +++ b/spec/compiler/codegen/hooks_spec.cr @@ -115,7 +115,9 @@ describe "Code gen: hooks" do end it "does inherited macro before class body" do - run(" + run(%( + require "prelude" + class Global @@x = 123 @@ -142,7 +144,7 @@ describe "Code gen: hooks" do end Bar.y - ").to_i.should eq(123) + )).to_i.should eq(123) end it "does finished" do diff --git a/spec/compiler/codegen/macro_spec.cr b/spec/compiler/codegen/macro_spec.cr index c6b22b1af023..85f92ef918e1 100644 --- a/spec/compiler/codegen/macro_spec.cr +++ b/spec/compiler/codegen/macro_spec.cr @@ -663,6 +663,8 @@ describe "Code gen: macro" do it "transforms hooks (bug)" do codegen(%( + require "prelude" + module GC def self.add_finalizer(object : T) object.responds_to?(:finalize) @@ -800,6 +802,8 @@ describe "Code gen: macro" do it "copies base macro def to sub-subtype even after it was copied to a subtype (#448)" do run(%( + require "prelude" + class Object def class_name : String {{@type.name.stringify}} diff --git a/spec/compiler/codegen/pointer_spec.cr b/spec/compiler/codegen/pointer_spec.cr index c95333411e36..1f6a7b75de8c 100644 --- a/spec/compiler/codegen/pointer_spec.cr +++ b/spec/compiler/codegen/pointer_spec.cr @@ -250,10 +250,11 @@ describe "Code gen: pointer" do end it "gets pointer to constant" do - run(" + run(%( + require "prelude" FOO = 1 pointerof(FOO).value - ").to_i.should eq(1) + )).to_i.should eq(1) end it "passes pointer of pointer to method" do @@ -324,6 +325,8 @@ describe "Code gen: pointer" do it "does pointerof class variable with class" do run(%( + require "prelude" + class Bar def initialize(@x : Int32) end diff --git a/spec/compiler/codegen/proc_spec.cr b/spec/compiler/codegen/proc_spec.cr index d78e5c06ea1f..98bb279a2513 100644 --- a/spec/compiler/codegen/proc_spec.cr +++ b/spec/compiler/codegen/proc_spec.cr @@ -595,6 +595,8 @@ describe "Code gen: proc" do it "codegens proc to implicit self in constant (#647)" do run(%( + require "prelude" + module Foo def self.blah 1 diff --git a/spec/compiler/codegen/struct_spec.cr b/spec/compiler/codegen/struct_spec.cr index 02a286169700..c5da68588b74 100644 --- a/spec/compiler/codegen/struct_spec.cr +++ b/spec/compiler/codegen/struct_spec.cr @@ -171,7 +171,9 @@ describe "Code gen: struct" do end it "declares const struct" do - run(" + run(%( + require "prelude" + struct Foo def initialize(@x : Int32) end @@ -184,11 +186,13 @@ describe "Code gen: struct" do FOO = Foo.new(1) FOO.x - ").to_i.should eq(1) + )).to_i.should eq(1) end it "uses struct in if" do - run(" + run(%( + require "prelude" + struct Foo def initialize(@x : Int32) end @@ -206,7 +210,7 @@ describe "Code gen: struct" do foo = FOO end foo.x - ").to_i.should eq(1) + )).to_i.should eq(1) end it "uses nilable struct" do diff --git a/spec/std/channel_spec.cr b/spec/std/channel_spec.cr index 898ef6a1304c..6d3313f39c24 100644 --- a/spec/std/channel_spec.cr +++ b/spec/std/channel_spec.cr @@ -7,11 +7,11 @@ end describe Channel do it "creates unbuffered with no arguments" do - Channel(Int32).new.should be_a(Channel::Unbuffered(Int32)) + Channel(Int32).new end it "creates buffered with capacity argument" do - Channel(Int32).new(32).should be_a(Channel::Buffered(Int32)) + Channel(Int32).new(32) end it "send returns channel" do @@ -34,16 +34,16 @@ describe Channel do end end -describe Channel::Unbuffered do +describe "unbuffered" do it "pings" do - ch = Channel::Unbuffered(Int32).new + ch = Channel(Int32).new spawn { ch.send(ch.receive) } ch.send 123 ch.receive.should eq(123) end it "blocks if there is no receiver" do - ch = Channel::Unbuffered(Int32).new + ch = Channel(Int32).new state = 0 main = Fiber.current @@ -65,7 +65,7 @@ describe Channel::Unbuffered do end it "deliver many senders" do - ch = Channel::Unbuffered(Int32).new + ch = Channel(Int32).new spawn { ch.send 1; ch.send 4 } spawn { ch.send 2; ch.send 5 } spawn { ch.send 3; ch.send 6 } @@ -73,40 +73,27 @@ describe Channel::Unbuffered do (1..6).map { ch.receive }.sort.should eq([1, 2, 3, 4, 5, 6]) end - it "gets not full when there is a sender" do - ch = Channel::Unbuffered(Int32).new - ch.full?.should be_true - ch.empty?.should be_true - sender = Fiber.new { ch.send 123 } - yield_to(sender) - ch.empty?.should be_false - ch.full?.should be_true - ch.receive.should eq(123) - end - it "works with select" do - ch1 = Channel::Unbuffered(Int32).new - ch2 = Channel::Unbuffered(Int32).new + ch1 = Channel(Int32).new + ch2 = Channel(Int32).new spawn { ch1.send 123 } Channel.select(ch1.receive_select_action, ch2.receive_select_action).should eq({0, 123}) end it "works with select else" do - ch1 = Channel::Unbuffered(Int32).new - Channel.select({ch1.receive_select_action}, true).should eq({1, nil}) + ch1 = Channel(Int32).new + Channel.select({ch1.receive_select_action}, true).should eq({1, Channel::NotReady}) end it "can send and receive nil" do - ch = Channel::Unbuffered(Nil).new + ch = Channel(Nil).new sender = Fiber.new { ch.send nil } yield_to(sender) - ch.empty?.should be_false ch.receive.should be_nil - ch.empty?.should be_true end it "can be closed" do - ch = Channel::Unbuffered(Int32).new + ch = Channel(Int32).new ch.closed?.should be_false ch.close.should be_nil ch.closed?.should be_true @@ -114,20 +101,20 @@ describe Channel::Unbuffered do end it "can be closed after sending" do - ch = Channel::Unbuffered(Int32).new + ch = Channel(Int32).new spawn { ch.send 123; ch.close } ch.receive.should eq(123) expect_raises(Channel::ClosedError) { ch.receive } end it "can be closed from different fiber" do - ch = Channel::Unbuffered(Int32).new - received = false + ch = Channel(Int32).new + closed = false main = Fiber.current receiver = Fiber.new do expect_raises(Channel::ClosedError) { ch.receive } - received = true + closed = true ensure yield_to(main) end @@ -136,35 +123,38 @@ describe Channel::Unbuffered do ch.close sleep - received.should be_true + closed.should be_true end it "cannot send if closed" do - ch = Channel::Unbuffered(Int32).new + ch = Channel(Int32).new ch.close expect_raises(Channel::ClosedError) { ch.send 123 } end it "can receive? when closed" do - ch = Channel::Unbuffered(Int32).new + ch = Channel(Int32).new ch.close ch.receive?.should be_nil end it "can receive? when not empty" do - ch = Channel::Unbuffered(Int32).new + ch = Channel(Int32).new spawn { ch.send 123 } ch.receive?.should eq(123) end it "wakes up sender fiber when channel is closed" do - ch = Channel::Unbuffered(Nil).new + ch = Channel(Nil).new closed = false main = Fiber.current sender = Fiber.new do - ch.send(nil) - closed = ch.closed? + begin + ch.send(nil) + rescue Channel::ClosedError + closed = true + end yield_to(main) end @@ -177,7 +167,7 @@ describe Channel::Unbuffered do end it "wakes up receiver fibers when channel is closed" do - ch = Channel::Unbuffered(Nil).new + ch = Channel(Nil).new closed = false main = Fiber.current @@ -198,34 +188,31 @@ describe Channel::Unbuffered do end end -describe Channel::Buffered do +describe "buffered" do it "pings" do - ch = Channel::Buffered(Int32).new + ch = Channel(Int32).new(10) spawn { ch.send(ch.receive) } ch.send 123 ch.receive.should eq(123) end it "blocks when full" do - ch = Channel::Buffered(Int32).new(2) + ch = Channel(Int32).new(2) freed = false spawn { 2.times { ch.receive }; freed = true } ch.send 1 - ch.full?.should be_false freed.should be_false ch.send 2 - ch.full?.should be_true freed.should be_false ch.send 3 - ch.full?.should be_false freed.should be_true end it "doesn't block when not full" do - ch = Channel::Buffered(Int32).new + ch = Channel(Int32).new(10) done = false sender = Fiber.new { ch.send 123; done = true } yield_to(sender) @@ -233,30 +220,27 @@ describe Channel::Buffered do end it "gets ready with data" do - ch = Channel::Buffered(Int32).new - ch.empty?.should be_true + ch = Channel(Int32).new(10) ch.send 123 - ch.empty?.should be_false + ch.receive.should eq(123) end it "works with select" do - ch1 = Channel::Buffered(Int32).new - ch2 = Channel::Buffered(Int32).new + ch1 = Channel(Int32).new(10) + ch2 = Channel(Int32).new(10) spawn { ch1.send 123 } Channel.select(ch1.receive_select_action, ch2.receive_select_action).should eq({0, 123}) end it "can send and receive nil" do - ch = Channel::Buffered(Nil).new + ch = Channel(Nil).new(10) sender = Fiber.new { ch.send nil } yield_to(sender) - ch.empty?.should be_false ch.receive.should be_nil - ch.empty?.should be_true end it "can be closed" do - ch = Channel::Buffered(Int32).new + ch = Channel(Int32).new(10) ch.closed?.should be_false ch.close ch.closed?.should be_true @@ -264,14 +248,15 @@ describe Channel::Buffered do end it "can be closed after sending" do - ch = Channel::Buffered(Int32).new - spawn { ch.send 123; ch.close } + ch = Channel(Int32).new(10) + ch.send 123 + ch.close ch.receive.should eq(123) expect_raises(Channel::ClosedError) { ch.receive } end it "can be closed from different fiber" do - ch = Channel::Buffered(Int32).new + ch = Channel(Int32).new(10) received = false main = Fiber.current @@ -289,40 +274,40 @@ describe Channel::Buffered do end it "cannot send if closed" do - ch = Channel::Buffered(Int32).new + ch = Channel(Int32).new(10) ch.close expect_raises(Channel::ClosedError) { ch.send 123 } end it "can receive? when closed" do - ch = Channel::Buffered(Int32).new + ch = Channel(Int32).new(10) ch.close ch.receive?.should be_nil end it "can receive? when not empty" do - ch = Channel::Buffered(Int32).new + ch = Channel(Int32).new(10) spawn { ch.send 123 } ch.receive?.should eq(123) end it "does inspect on unbuffered channel" do - ch = Channel::Unbuffered(Int32).new - ch.inspect.should eq("#") + ch = Channel(Int32).new + ch.inspect.should eq("#") end it "does inspect on buffered channel" do - ch = Channel::Buffered(Int32).new(10) - ch.inspect.should eq("#") + ch = Channel(Int32).new(10) + ch.inspect.should eq("#") end it "does pretty_inspect on unbuffered channel" do - ch = Channel::Unbuffered(Int32).new - ch.pretty_inspect.should eq("#") + ch = Channel(Int32).new + ch.pretty_inspect.should eq("#") end it "does pretty_inspect on buffered channel" do - ch = Channel::Buffered(Int32).new(10) - ch.pretty_inspect.should eq("#") + ch = Channel(Int32).new(10) + ch.pretty_inspect.should eq("#") end end diff --git a/spec/std/concurrent/future_spec.cr b/spec/std/concurrent/future_spec.cr index 7a81fe72cc43..10f7563b8672 100644 --- a/spec/std/concurrent/future_spec.cr +++ b/spec/std/concurrent/future_spec.cr @@ -34,7 +34,7 @@ describe Concurrent::Future do describe "future" do it "computes a value" do - chan = Channel::Unbuffered(Int32).new + chan = Channel(Int32).new f = future { chan.receive } f.running?.should be_true @@ -58,18 +58,11 @@ describe Concurrent::Future do end it "raises" do - # we rely on the channel to sync fibers: - chan = Channel::Unbuffered(Int32).new - f = future do - chan.receive raise IndexError.new("test error") end f.running?.should be_true - chan.send(0) - f.completed?.should be_true - expect_raises(IndexError) { f.get } f.completed?.should be_true end diff --git a/spec/std/concurrent/select_spec.cr b/spec/std/concurrent/select_spec.cr index cd598f1b0e77..460de41f76ca 100644 --- a/spec/std/concurrent/select_spec.cr +++ b/spec/std/concurrent/select_spec.cr @@ -1,10 +1,5 @@ require "spec" -private def yield_to(fiber) - Crystal::Scheduler.enqueue(Fiber.current) - Crystal::Scheduler.resume(fiber) -end - describe "select" do it "select many receviers" do ch1 = Channel(Int32).new @@ -30,13 +25,14 @@ describe "select" do it "select many senders" do ch1 = Channel(Int32).new ch2 = Channel(Int32).new - res = [] of Int32 - spawn do - 5.times { res << ch1.receive } + res = Array.new(10, 0) + + f1 = spawn do + 5.times { res[ch1.receive] = 1 } end - spawn do - 5.times { res << ch2.receive } + f2 = spawn do + 5.times { res[ch2.receive] = 1 } end 10.times do |i| @@ -45,14 +41,19 @@ describe "select" do when ch2.send(i) end end - res.should eq (0...10).to_a + + until f1.dead? && f2.dead? + Fiber.yield + end + + res.should eq Array.new(10, 1) end it "select many receivers, senders" do ch1 = Channel(Int32).new ch2 = Channel(Int32).new res = [] of Int32 - spawn do + f = spawn do 10.times do |i| select when x = ch1.receive @@ -69,6 +70,11 @@ describe "select" do res << y end end + + until f.dead? + Fiber.yield + end + res.should eq (0...10).to_a end @@ -94,10 +100,93 @@ describe "select" do x = b end ensure - yield_to(main) + Crystal::Scheduler.enqueue(main) end sleep x.should eq 1 end + + it "select same channel multiple times" do + ch = Channel(Int32).new + + spawn do + ch.send(123) + end + + select + when ch.send(456) + when x = ch.receive + end + + x.should eq 123 + end + + it "priorize by order when entering in a select" do + ch1 = Channel(Int32).new(5) + ch2 = Channel(Int32).new(5) + + 2.times { ch1.send 1 } + 2.times { ch2.send 2 } + + select + when x = ch1.receive + when x = ch2.receive + end + x.should eq 1 + + select + when x = ch2.receive + when x = ch1.receive + end + x.should eq 2 + end + + it "stress select with send/receive in multiple fibers" do + fibers = 4 + msg_per_sender = 1000 + ch = Array.new(fibers) { Array.new(fibers) { Channel(Int32).new } } + done = Channel({Int32, Int32}).new + + fibers.times do |i| + spawn(name: "sender #{i}") do + channels = ch[i] + msg_per_sender.times do |i| + Channel.send_first(i, channels) + end + channels.map &.send(-1) + end + end + + fibers.times do |i| + spawn(name: "receiver #{i}") do + channels = ch.map { |chs| chs[i] } + closed = 0 + count = 0 + sum = 0 + loop do + x = Channel.receive_first(channels).not_nil! + if x == -1 + closed += 1 + break if closed == fibers + else + count += 1 + sum += x + end + end + done.send({count, sum}) + end + end + + count = 0 + sum = 0 + fibers.times do + c, s = done.receive + count += c + sum += s + end + + count.should eq(fibers * msg_per_sender) + sum.should eq(msg_per_sender * (msg_per_sender - 1) / 2 * fibers) + end end diff --git a/spec/std/concurrent_spec.cr b/spec/std/concurrent_spec.cr index 5913859797cb..cc51355f619b 100644 --- a/spec/std/concurrent_spec.cr +++ b/spec/std/concurrent_spec.cr @@ -58,7 +58,7 @@ describe "concurrent" do end it "spawns named" do - chan = Channel::Unbuffered(Nil).new + chan = Channel(Nil).new spawn(name: "sub") do Fiber.current.name.should eq("sub") chan.close @@ -67,7 +67,7 @@ describe "concurrent" do end it "spawns named with macro" do - chan = Channel::Unbuffered(Nil).new + chan = Channel(Nil).new spawn method_named("foo", chan), name: "foo" chan.receive? end diff --git a/spec/std/http/server/server_spec.cr b/spec/std/http/server/server_spec.cr index 413c5a04f4ac..2687e50b2dc7 100644 --- a/spec/std/http/server/server_spec.cr +++ b/spec/std/http/server/server_spec.cr @@ -55,6 +55,24 @@ describe HTTP::Server do server.listen end + it "closes the server" do + server = HTTP::Server.new { } + address = server.bind_unused_port + ch = Channel(Symbol).new + + spawn do + server.listen + ch.send :end + end + + delay(1) { ch.send :timeout } + + TCPSocket.open(address.address, address.port) { } + + server.close + ch.receive.should eq(:end) + end + it "reuses the TCP port (SO_REUSEPORT)" do s1 = HTTP::Server.new { |ctx| } address = s1.bind_unused_port(reuse_port: true) @@ -376,7 +394,11 @@ describe HTTP::Server do end server.closed?.should be_false - server_done.empty?.should be_true + select + when ret = server_done.receive + fail("Server finished with #{ret}") + else + end end end diff --git a/spec/std/io/io_spec.cr b/spec/std/io/io_spec.cr index 1e29d01f809d..6b2f963abc05 100644 --- a/spec/std/io/io_spec.cr +++ b/spec/std/io/io_spec.cr @@ -822,6 +822,52 @@ describe IO do io.encoding.should eq("UTF-16LE") end end + + describe "#close" do + it "aborts 'read' in a different thread" do + ch = Channel(Symbol).new(1) + + IO.pipe do |read, write| + spawn do + ch.send :start + read.gets + rescue + ch.send :end + end + + delay(1) { ch.send :timeout } + + ch.receive.should eq(:start) + read.close + ch.receive.should eq(:end) + end + end + + it "aborts 'write' in a different thread" do + ch = Channel(Symbol).new(1) + + IO.pipe do |read, write| + f = spawn do + ch.send :start + loop do + write.puts "some line" + end + rescue + ch.send :end + end + + delay(1) { ch.send :timeout } + + ch.receive.should eq(:start) + while f.running? + # Wait until the fiber is blocked + Fiber.yield + end + write.close + ch.receive.should eq(:end) + end + end + end end typeof(STDIN.cooked { }) diff --git a/spec/std/mutex_spec.cr b/spec/std/mutex_spec.cr index f4f232754c2d..5cbfa40ea473 100644 --- a/spec/std/mutex_spec.cr +++ b/spec/std/mutex_spec.cr @@ -64,4 +64,25 @@ describe Mutex do three.should be_true four.should be_true end + + it "works with multiple threads" do + x = 0 + mutex = Mutex.new + + fibers = 10.times.map do + spawn do + 100.times do + mutex.synchronize { x += 1 } + end + end + end.to_a + + fibers.each do |f| + while !f.dead? + Fiber.yield + end + end + + x.should eq(1000) + end end diff --git a/spec/std/process_spec.cr b/spec/std/process_spec.cr index 4c5490f352d2..382bbf3bb886 100644 --- a/spec/std/process_spec.cr +++ b/spec/std/process_spec.cr @@ -161,20 +161,20 @@ describe Process do describe "kill" do it "kills a process" do - process = fork { loop { } } + process = Process.new("yes") process.kill(Signal::KILL).should be_nil end it "kills many process" do - process1 = fork { loop { } } - process2 = fork { loop { } } + process1 = Process.new("yes") + process2 = Process.new("yes") process1.kill(Signal::KILL).should be_nil process2.kill(Signal::KILL).should be_nil end end it "gets the pgid of a process id" do - process = fork { loop { } } + process = Process.new("yes") Process.pgid(process.pid).should be_a(Int32) process.kill(Signal::KILL) Process.pgid.should eq(Process.pgid(Process.pid)) @@ -191,18 +191,20 @@ describe Process do buffer.to_s.lines.size.should eq(1000) end - it "executes the new process with exec" do - with_tempfile("crystal-spec-exec") do |path| - File.exists?(path).should be_false + {% unless flag?(:preview_mt) %} + it "executes the new process with exec" do + with_tempfile("crystal-spec-exec") do |path| + File.exists?(path).should be_false - fork = Process.fork do - Process.exec("/usr/bin/env", {"touch", path}) - end - fork.wait + fork = Process.fork do + Process.exec("/usr/bin/env", {"touch", path}) + end + fork.wait - File.exists?(path).should be_true + File.exists?(path).should be_true + end end - end + {% end %} it "checks for existence" do # We can't reliably check whether it ever returns false, since we can't predict @@ -211,7 +213,7 @@ describe Process do # pid. Process.exists?(Process.ppid).should be_true - process = Process.fork { sleep 5 } + process = Process.new("yes") process.exists?.should be_true process.terminated?.should be_false diff --git a/spec/std/socket/unix_server_spec.cr b/spec/std/socket/unix_server_spec.cr index 14b10f88a5f7..a6bfc3e7a720 100644 --- a/spec/std/socket/unix_server_spec.cr +++ b/spec/std/socket/unix_server_spec.cr @@ -78,20 +78,24 @@ describe UNIXServer do it "raises when server is closed" do with_tempfile("unix_server-closed.sock") do |path| server = UNIXServer.new(path) + ch = Channel(Symbol).new(1) exception = nil + delay(1) { ch.send :timeout } + spawn do begin + ch.send(:begin) server.accept rescue ex exception = ex end + ch.send(:end) end + ch.receive.should eq(:begin) server.close - until exception - Fiber.yield - end + ch.receive.should eq(:end) exception.should be_a(IO::Error) exception.try(&.message).should eq("Closed stream") @@ -115,15 +119,21 @@ describe UNIXServer do it "returns nil when server is closed" do with_tempfile("unix_server-accept2.sock") do |path| server = UNIXServer.new(path) + ch = Channel(Symbol).new(1) ret = :initial - spawn { ret = server.accept? } - server.close + delay(1) { ch.send :timeout } - while ret == :initial - Fiber.yield + spawn do + ch.send :begin + ret = server.accept? + ch.send :end end + ch.receive.should eq(:begin) + server.close + ch.receive.should eq(:end) + ret.should be_nil end end diff --git a/spec/std/time/time_spec.cr b/spec/std/time/time_spec.cr index 62d4f1e0444c..ee95e03edd06 100644 --- a/spec/std/time/time_spec.cr +++ b/spec/std/time/time_spec.cr @@ -175,6 +175,10 @@ describe Time do end end + it "UNIX_EPOCH" do + Time::UNIX_EPOCH.should eq(Time.utc(1970, 1, 1)) + end + it ".unix" do seconds = 1439404155 time = Time.unix(seconds) diff --git a/src/channel.cr b/src/channel.cr index 898562be9b74..f1ae0f00b680 100644 --- a/src/channel.cr +++ b/src/channel.cr @@ -1,4 +1,5 @@ require "fiber" +require "crystal/spin_lock" # A `Channel` enables concurrent communication between fibers. # @@ -15,12 +16,55 @@ require "fiber" # channel.receive # => 0 # channel.receive # => 1 # ``` -abstract class Channel(T) - module SelectAction - abstract def ready? - abstract def execute - abstract def wait +class Channel(T) + @lock = Crystal::SpinLock.new + @queue : Deque(T)? + + module NotReady + extend self + end + + module SelectAction(S) + abstract def execute : S | NotReady + abstract def wait(context : SelectContext(S)) abstract def unwait + abstract def result : S + abstract def lock_object_id + abstract def lock + abstract def unlock + + def create_context_and_wait(state_ptr) + context = SelectContext.new(state_ptr, self) + self.wait(context) + context + end + end + + enum SelectState + None = 0 + Active = 1 + Done = 2 + end + + private class SelectContext(S) + @state : Pointer(Atomic(SelectState)) + property action : SelectAction(S) + @activated = false + + def initialize(@state, @action : SelectAction(S)) + end + + def activated? + @activated + end + + def try_trigger : Bool + _, succeed = @state.value.compare_and_set(SelectState::Active, SelectState::Done) + if succeed + @activated = true + end + succeed + end end class ClosedError < Exception @@ -29,25 +73,32 @@ abstract class Channel(T) end end - def initialize - @closed = false - @senders = Deque(Fiber).new - @receivers = Deque(Fiber).new - end - - def self.new : Unbuffered(T) - Unbuffered(T).new + enum DeliveryState + None + Delivered + Closed end - def self.new(capacity) : Buffered(T) - Buffered(T).new(capacity) + def initialize(@capacity = 0) + @closed = false + @senders = Deque({Fiber, T, SelectContext(Nil)?}).new + @receivers = Deque({Fiber, Pointer(T), Pointer(DeliveryState), SelectContext(T)?}).new + if capacity > 0 + @queue = Deque(T).new(capacity) + end end def close @closed = true - Crystal::Scheduler.enqueue @senders + + @senders.each &.first.enqueue + + @receivers.each do |receiver| + receiver[2].value = DeliveryState::Closed + receiver[0].enqueue + end + @senders.clear - Crystal::Scheduler.enqueue @receivers @receivers.clear nil end @@ -56,6 +107,34 @@ abstract class Channel(T) @closed end + def send(value : T) + @lock.sync do + raise_if_closed + + send_internal(value) do + @senders << {Fiber.current, value, nil} + @lock.unsync do + Crystal::Scheduler.reschedule + end + raise_if_closed + end + + self + end + end + + protected def send_internal(value : T) + if receiver = dequeue_receiver + receiver[1].value = value + receiver[2].value = DeliveryState::Delivered + receiver[0].enqueue + elsif (queue = @queue) && queue.size < @capacity + queue << value + else + yield + end + end + # Receives a value from the channel. # If there is a value waiting, it is returned immediately. Otherwise, this method blocks until a value is sent to the channel. # @@ -78,6 +157,70 @@ abstract class Channel(T) receive_impl { return nil } end + def receive_impl + @lock.sync do + receive_internal do + yield if @closed + + value = uninitialized T + state = DeliveryState::None + @receivers << {Fiber.current, pointerof(value), pointerof(state), nil} + @lock.unsync do + Crystal::Scheduler.reschedule + end + + case state + when DeliveryState::Delivered + value + when DeliveryState::Closed + yield + else + raise "BUG: Fiber was awaken without channel delivery state set" + end + end + end + end + + def receive_internal + if (queue = @queue) && !queue.empty? + deque_value = queue.shift + if sender = dequeue_sender + sender[0].enqueue + queue << sender[1] + end + deque_value + elsif sender = dequeue_sender + sender[0].enqueue + sender[1] + else + yield + end + end + + private def dequeue_receiver + while receiver = @receivers.shift? + if (select_context = receiver[3]) && !select_context.try_trigger + next + end + + break + end + + receiver + end + + private def dequeue_sender + while sender = @senders.shift? + if (select_context = sender[2]) && !select_context.try_trigger + next + end + + break + end + + sender + end + def inspect(io : IO) : Nil to_s(io) end @@ -86,20 +229,20 @@ abstract class Channel(T) pp.text inspect end - protected def wait_for_receive - @receivers << Fiber.current + protected def wait_for_receive(value, state, context) + @receivers << {Fiber.current, value, state, context} end protected def unwait_for_receive - @receivers.delete Fiber.current + @receivers.delete_if { |r| r[0] == Fiber.current } end - protected def wait_for_send - @senders << Fiber.current + protected def wait_for_send(value, context) + @senders << {Fiber.current, value, context} end protected def unwait_for_send - @senders.delete Fiber.current + @senders.delete_if { |r| r[0] == Fiber.current } end protected def raise_if_closed @@ -111,7 +254,12 @@ abstract class Channel(T) end def self.receive_first(channels : Tuple | Array) - self.select(channels.map(&.receive_select_action))[1] + _, value = self.select(channels.map(&.receive_select_action)) + if value.is_a?(NotReady) + raise "BUG: Channel.select returned not ready status" + end + + value end def self.send_first(value, *channels) @@ -127,23 +275,50 @@ abstract class Channel(T) self.select ops end - def self.select(ops : Tuple | Array, has_else = false) - loop do - ops.each_with_index do |op, index| - if op.ready? - result = op.execute - return index, result - end - end + def self.select(ops : Indexable(SelectAction), has_else = false) + # Sort the operations by the channel they contain + # This is to avoid deadlocks between concurrent `select` calls + ops_locks = ops + .to_a + .uniq(&.lock_object_id) + .sort_by(&.lock_object_id) + + ops_locks.each &.lock - if has_else - return ops.size, nil + ops.each_with_index do |op, index| + ignore = false + result = op.execute + + unless result.is_a?(NotReady) + ops_locks.each &.unlock + return index, result end + end + + if has_else + ops_locks.each &.unlock + return ops.size, NotReady + end - ops.each &.wait - Crystal::Scheduler.reschedule - ops.each &.unwait + state = Atomic(SelectState).new(SelectState::Active) + contexts = ops.map &.create_context_and_wait(pointerof(state)) + + ops_locks.each &.unlock + Crystal::Scheduler.reschedule + + ops.each do |op| + op.lock + op.unwait + op.unlock + end + + contexts.each_with_index do |context, index| + if context.activated? + return index, context.action.result + end end + + raise "BUG: Fiber was awaken from select but no action was activated" end # :nodoc: @@ -157,164 +332,79 @@ abstract class Channel(T) end # :nodoc: - struct ReceiveAction(C) - include SelectAction - - def initialize(@channel : C) + class ReceiveAction(T) + include SelectAction(T) + property value : T + property state : DeliveryState + + def initialize(@channel : Channel(T)) + @value = uninitialized T + @state = DeliveryState::None end - def ready? - !@channel.empty? + def execute + @channel.receive_internal { return NotReady } end - def execute - @channel.receive + def result + @value end - def wait - @channel.wait_for_receive + def wait(context : SelectContext(T)) + @channel.wait_for_receive(pointerof(@value), pointerof(@state), context) end def unwait @channel.unwait_for_receive end - end - - # :nodoc: - struct SendAction(C, T) - include SelectAction - - def initialize(@channel : C, @value : T) - end - - def ready? - !@channel.full? - end - def execute - @channel.send(@value) + def lock_object_id + @channel.object_id end - def wait - @channel.wait_for_send + def lock + @channel.@lock.lock end - def unwait - @channel.unwait_for_send + def unlock + @channel.@lock.unlock end end -end -# Buffered channel, using a queue. -class Channel::Buffered(T) < Channel(T) - def initialize(@capacity = 32) - @queue = Deque(T).new(@capacity) - super() - end + # :nodoc: + class SendAction(T) + include SelectAction(Nil) - # Send a value to the channel. - def send(value : T) - while full? - raise_if_closed - @senders << Fiber.current - Crystal::Scheduler.reschedule + def initialize(@channel : Channel(T), @value : T) end - raise_if_closed - - @queue << value - Crystal::Scheduler.enqueue @receivers - @receivers.clear - - self - end - - private def receive_impl - while empty? - yield if @closed - @receivers << Fiber.current - Crystal::Scheduler.reschedule + def execute + @channel.send_internal(@value) { return NotReady } + nil end - @queue.shift.tap do - Crystal::Scheduler.enqueue @senders - @senders.clear + def result + nil end - end - - def full? - @queue.size >= @capacity - end - - def empty? - @queue.empty? - end -end -# Unbuffered channel. -class Channel::Unbuffered(T) < Channel(T) - @sender : Fiber? - - def initialize - @has_value = false - @value = uninitialized T - super - end - - # Send a value to the channel. - def send(value : T) - while @has_value - raise_if_closed - @senders << Fiber.current - Crystal::Scheduler.reschedule + def wait(context : SelectContext(Nil)) + @channel.wait_for_send(@value, context) end - raise_if_closed - - @value = value - @has_value = true - @sender = Fiber.current - - if receiver = @receivers.shift? - receiver.resume - else - Crystal::Scheduler.reschedule + def unwait + @channel.unwait_for_send end - end - private def receive_impl - until @has_value - yield if @closed - @receivers << Fiber.current - if sender = @senders.shift? - sender.resume - else - Crystal::Scheduler.reschedule - end + def lock_object_id + @channel.object_id end - yield if @closed - - @value.tap do - @has_value = false - Crystal::Scheduler.enqueue @sender.not_nil! - @sender = nil + def lock + @channel.@lock.lock end - end - - def empty? - !@has_value && @senders.empty? - end - def full? - @has_value || @receivers.empty? - end - - def close - super - if sender = @sender - Crystal::Scheduler.enqueue sender - @sender = nil + def unlock + @channel.@lock.unlock end end end diff --git a/src/compiler/crystal/codegen/class_var.cr b/src/compiler/crystal/codegen/class_var.cr index e01adfeeafda..cbdffd5a8b48 100644 --- a/src/compiler/crystal/codegen/class_var.cr +++ b/src/compiler/crystal/codegen/class_var.cr @@ -82,6 +82,7 @@ class Crystal::CodeGenVisitor def initialize_class_var(class_var : MetaTypeVar, initializer : ClassVarInitializer) init_func = create_initialize_class_var_function(class_var, initializer) + init_func = check_main_fun(init_func.name, init_func) if init_func # For unsafe class var we just initialize them without # using a flag to know if they were initialized @@ -89,7 +90,6 @@ class Crystal::CodeGenVisitor global = declare_class_var(class_var) global = ensure_class_var_in_this_module(global, class_var) if init_func - check_main_fun init_func.name, init_func call init_func end return global @@ -97,20 +97,12 @@ class Crystal::CodeGenVisitor global, initialized_flag = declare_class_var_and_initialized_flag_in_this_module(class_var) - initialized_block, not_initialized_block = new_blocks "initialized", "not_initialized" - - initialized = load(initialized_flag) - cond initialized, initialized_block, not_initialized_block - - position_at_end not_initialized_block - store int1(1), initialized_flag - - init_func = check_main_fun init_func.name, init_func - call init_func - - br initialized_block + lazy_initialize_class_var(initializer.node, init_func, global, initialized_flag) + end - position_at_end initialized_block + def lazy_initialize_class_var(node, init_func, global, initialized_flag) + set_current_debug_location node if @debug.line_numbers? + run_once(initialized_flag, init_func) global end @@ -317,22 +309,8 @@ class Crystal::CodeGenVisitor in_main do define_main_function(fun_name, ([] of LLVM::Type), llvm_type(class_var.type).pointer) do |func| - initialized_block, not_initialized_block = new_blocks "initialized", "not_initialized" - - initialized = load(initialized_flag) - cond initialized, initialized_block, not_initialized_block - - position_at_end not_initialized_block - store int1(1), initialized_flag - - check_main_fun init_func.name, init_func - call init_func - - br initialized_block - - position_at_end initialized_block - - ret global + init_func = check_main_fun init_func.name, init_func + ret lazy_initialize_class_var(initializer.node, init_func, global, initialized_flag) end end end diff --git a/src/compiler/crystal/codegen/codegen.cr b/src/compiler/crystal/codegen/codegen.cr index 7750c2fba9ec..d27a2bdc7004 100644 --- a/src/compiler/crystal/codegen/codegen.cr +++ b/src/compiler/crystal/codegen/codegen.cr @@ -13,6 +13,8 @@ module Crystal MALLOC_ATOMIC_NAME = "__crystal_malloc_atomic64" REALLOC_NAME = "__crystal_realloc64" GET_EXCEPTION_NAME = "__crystal_get_exception" + ONCE_INIT = "__crystal_once_init" + ONCE = "__crystal_once" class Program def run(code, filename = nil, debug = Debug::Default) @@ -243,12 +245,13 @@ module Crystal initialize_argv_and_argc - initialize_simple_constants - - if @debug.line_numbers? && (filename = @program.filename) - set_current_debug_location Location.new(filename, 1, 1) + if @debug.line_numbers? + set_current_debug_location Location.new(@program.filename || "(no name)", 1, 1) end + once_init + initialize_simple_constants + alloca_vars @program.vars, @program emit_vars_debug_info(@program.vars) if @debug.variables? @@ -301,7 +304,9 @@ module Crystal def visit(node : FunDef) case node.name - when MALLOC_NAME, MALLOC_ATOMIC_NAME, REALLOC_NAME, RAISE_NAME, @codegen.personality_name, GET_EXCEPTION_NAME, RAISE_OVERFLOW_NAME + when MALLOC_NAME, MALLOC_ATOMIC_NAME, REALLOC_NAME, RAISE_NAME, + @codegen.personality_name, GET_EXCEPTION_NAME, RAISE_OVERFLOW_NAME, + ONCE_INIT, ONCE @codegen.accept node end false diff --git a/src/compiler/crystal/codegen/const.cr b/src/compiler/crystal/codegen/const.cr index 54e0b509169d..e3ba1e83420c 100644 --- a/src/compiler/crystal/codegen/const.cr +++ b/src/compiler/crystal/codegen/const.cr @@ -29,6 +29,8 @@ require "./codegen" # and can be done in any order (they have no side effects). class Crystal::CodeGenVisitor + @const_mutex : LLVM::Value? + # The special constants ARGC_UNSAFE and ARGV_UNSAFE need to be initialized # as soon as the program starts, because we have access to argc and argv # in the main function @@ -96,27 +98,16 @@ class Crystal::CodeGenVisitor def initialize_const(const) # Maybe the constant was simple and doesn't need a real initialization - return if const.initializer global, initialized_flag = declare_const_and_initialized_flag(const) - - initialized_block, not_initialized_block = new_blocks "initialized", "not_initialized" - - initialized = load(initialized_flag) - cond initialized, initialized_block, not_initialized_block - - position_at_end not_initialized_block - store int1(1), initialized_flag + return global if const.initializer init_function_name = "~#{const.initialized_llvm_name}" func = @main_mod.functions[init_function_name]? || create_initialize_const_function(init_function_name, const) func = check_main_fun init_function_name, func - call func - - br initialized_block - - position_at_end initialized_block + set_current_debug_location const.locations.try &.first? if @debug.line_numbers? + run_once(initialized_flag, func) global end @@ -202,26 +193,9 @@ class Crystal::CodeGenVisitor end def create_read_const_function(fun_name, const) - global, initialized_flag = declare_const_and_initialized_flag(const) - in_main do define_main_function(fun_name, ([] of LLVM::Type), llvm_type(const.value.type).pointer) do |func| - initialized_block, not_initialized_block = new_blocks "initialized", "not_initialized" - - initialized = load(initialized_flag) - cond initialized, initialized_block, not_initialized_block - - position_at_end not_initialized_block - store int1(1), initialized_flag - - init_function_name = "~#{const.initialized_llvm_name}" - func = @main_mod.functions[init_function_name]? || create_initialize_const_function(init_function_name, const) - call func - - br initialized_block - - position_at_end initialized_block - + global = initialize_const(const) ret global end end diff --git a/src/compiler/crystal/codegen/once.cr b/src/compiler/crystal/codegen/once.cr new file mode 100644 index 000000000000..90f18ed48b22 --- /dev/null +++ b/src/compiler/crystal/codegen/once.cr @@ -0,0 +1,35 @@ +require "./codegen" + +class Crystal::CodeGenVisitor + ONCE_STATE = "~ONCE_STATE" + + def once_init + if once_init_fun = @main_mod.functions[ONCE_INIT]? + once_init_fun = check_main_fun ONCE_INIT, once_init_fun + + once_state_global = @main_mod.globals.add(once_init_fun.return_type, ONCE_STATE) + once_state_global.linkage = LLVM::Linkage::Internal if @single_module + once_state_global.initializer = once_init_fun.return_type.null + + state = call once_init_fun + store state, once_state_global + end + end + + def run_once(flag, func) + once_fun = main_fun(ONCE) + + once_state_global = @llvm_mod.globals[ONCE_STATE]? || begin + once_init_fun = main_fun(ONCE_INIT) + global = @llvm_mod.globals.add(once_init_fun.return_type, ONCE_STATE) + global.linkage = LLVM::Linkage::External + global + end + + call main_fun(ONCE), [ + load(once_state_global), + flag, + bit_cast(func.to_value, once_fun.params.last.type), + ] + end +end diff --git a/src/compiler/crystal/compiler.cr b/src/compiler/crystal/compiler.cr index 3853d896247b..f1ce705a9bba 100644 --- a/src/compiler/crystal/compiler.cr +++ b/src/compiler/crystal/compiler.cr @@ -74,7 +74,7 @@ module Crystal property? no_codegen = false # Maximum number of LLVM modules that are compiled in parallel - property n_threads = 8 + property n_threads : Int32 = {% if flag?(:preview_mt) %} 1 {% else %} 8 {% end %} # Default prelude file to use. This ends up adding a # `require "prelude"` (or whatever name is set here) to @@ -424,54 +424,58 @@ module Crystal return all_reused end - jobs_count = 0 - wait_channel = Channel(Array(String)).new(@n_threads) - - units.each_slice(Math.max(units.size // @n_threads, 1)) do |slice| - jobs_count += 1 - spawn do - # For stats output we want to count how many previous - # .o files were reused, mainly to detect performance regressions. - # Because we fork, we must communicate using a pipe. - reused = [] of String - if wants_stats_or_progress - pr, pw = IO.pipe - spawn do - pr.each_line do |line| - unit = JSON.parse(line) - reused << unit["name"].as_s if unit["reused"].as_bool - @progress_tracker.stage_progress += 1 + {% if flag?(:preview_mt) %} + raise "Cannot fork compiler in multithread mode" + {% else %} + jobs_count = 0 + wait_channel = Channel(Array(String)).new(@n_threads) + + units.each_slice(Math.max(units.size // @n_threads, 1)) do |slice| + jobs_count += 1 + spawn do + # For stats output we want to count how many previous + # .o files were reused, mainly to detect performance regressions. + # Because we fork, we must communicate using a pipe. + reused = [] of String + if wants_stats_or_progress + pr, pw = IO.pipe + spawn do + pr.each_line do |line| + unit = JSON.parse(line) + reused << unit["name"].as_s if unit["reused"].as_bool + @progress_tracker.stage_progress += 1 + end end end - end - codegen_process = fork do - pipe_w = pw - slice.each do |unit| - unit.compile - if pipe_w - unit_json = {name: unit.name, reused: unit.reused_previous_compilation?}.to_json - pipe_w.puts unit_json + codegen_process = fork do + pipe_w = pw + slice.each do |unit| + unit.compile + if pipe_w + unit_json = {name: unit.name, reused: unit.reused_previous_compilation?}.to_json + pipe_w.puts unit_json + end end end - end - codegen_process.wait + codegen_process.wait - if pipe_w = pw - pipe_w.close - Fiber.yield - end + if pipe_w = pw + pipe_w.close + Fiber.yield + end - wait_channel.send reused + wait_channel.send reused + end end - end - jobs_count.times do - reused = wait_channel.receive - all_reused.concat(reused) - end + jobs_count.times do + reused = wait_channel.receive + all_reused.concat(reused) + end - all_reused + all_reused + {% end %} end private def print_macro_run_stats(program) diff --git a/src/compiler/crystal/syntax/parser.cr b/src/compiler/crystal/syntax/parser.cr index 8d7dfc063880..9be592c62cdb 100644 --- a/src/compiler/crystal/syntax/parser.cr +++ b/src/compiler/crystal/syntax/parser.cr @@ -5690,7 +5690,8 @@ module Crystal when :"{{" members << parse_percent_macro_expression when :"{%" - members << parse_percent_macro_control + location = @token.location + members << parse_percent_macro_control.at(location) when :"@[" members << parse_annotation when :";", :NEWLINE diff --git a/src/concurrent.cr b/src/concurrent.cr index d244bdc3832f..cc54f2d8028e 100644 --- a/src/concurrent.cr +++ b/src/concurrent.cr @@ -58,8 +58,11 @@ end # # 2.times { ch.receive } # ``` -def spawn(*, name : String? = nil, &block) +def spawn(*, name : String? = nil, same_thread = false, &block) fiber = Fiber.new(name, &block) + if same_thread + fiber.@current_thread.set(Thread.current) + end Crystal::Scheduler.enqueue fiber fiber end @@ -94,7 +97,7 @@ end # This is because in the first case all spawned fibers refer to # the same local variable, while in the second example copies of # *i* are passed to a `Proc` that eventually invokes the call. -macro spawn(call, *, name = nil, &block) +macro spawn(call, *, name = nil, same_thread = false, &block) {% if block %} {% raise "`spawn(call)` can't be invoked with a block, did you mean `spawn(name: ...) { ... }`?" %} {% end %} @@ -110,7 +113,7 @@ macro spawn(call, *, name = nil, &block) {% end %} {% end %} ) { - spawn(name: {{name}}) do + spawn(name: {{name}}, same_thread: {{same_thread}}) do {% if call.receiver %}{{ call.receiver }}.{% end %}{{call.name}}( {% for arg, i in call.args %} __arg{{i}}, diff --git a/src/crystal/event.cr b/src/crystal/event.cr index 88d1a11f0949..e555bb8fafcd 100644 --- a/src/crystal/event.cr +++ b/src/crystal/event.cr @@ -1,5 +1,9 @@ require "./lib_event2" +{% if flag?(:preview_mt) %} + LibEvent2.evthread_use_pthreads +{% end %} + # :nodoc: struct Crystal::Event VERSION = String.new(LibEvent2.event_get_version) diff --git a/src/crystal/event_loop.cr b/src/crystal/event_loop.cr index a4726b16d4e3..763d8b3e0b41 100644 --- a/src/crystal/event_loop.cr +++ b/src/crystal/event_loop.cr @@ -1,28 +1,41 @@ require "./event" -module Crystal::EventLoop - @@eb = Crystal::Event::Base.new - @@dns_base : Crystal::Event::DnsBase? +class Thread + # :nodoc: + getter(event_base) { Crystal::Event::Base.new } - def self.after_fork - @@eb.reinit + # :nodoc: + getter(loop_fiber) do + Fiber.new(name: "Event Loop") do + loop do + self.event_base.run_once + Crystal::Scheduler.reschedule + end + end end +end + +module Crystal::EventLoop + {% unless flag?(:preview_mt) %} + def self.after_fork + Thread.current.event_base.reinit + end + {% end %} def self.resume loop_fiber.resume end + private def self.event_base + Thread.current.event_base + end + private def self.loop_fiber - @@loop_fiber ||= Fiber.new do - loop do - @@eb.run_once - Crystal::Scheduler.reschedule - end - end + Thread.current.loop_fiber end def self.create_resume_event(fiber) - @@eb.new_event(-1, LibEvent2::EventFlags::None, fiber) do |s, flags, data| + event_base.new_event(-1, LibEvent2::EventFlags::None, fiber) do |s, flags, data| Crystal::Scheduler.enqueue data.as(Fiber) end end @@ -31,7 +44,7 @@ module Crystal::EventLoop flags = LibEvent2::EventFlags::Write flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET if edge_triggered - @@eb.new_event(io.fd, flags, io) do |s, flags, data| + event_base.new_event(io.fd, flags, io) do |s, flags, data| io_ref = data.as(typeof(io)) if flags.includes?(LibEvent2::EventFlags::Write) io_ref.resume_write @@ -45,7 +58,7 @@ module Crystal::EventLoop flags = LibEvent2::EventFlags::Read flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET if edge_triggered - @@eb.new_event(io.fd, flags, io) do |s, flags, data| + event_base.new_event(io.fd, flags, io) do |s, flags, data| io_ref = data.as(typeof(io)) if flags.includes?(LibEvent2::EventFlags::Read) io_ref.resume_read @@ -54,12 +67,4 @@ module Crystal::EventLoop end end end - - private def self.dns_base - @@dns_base ||= @@eb.new_dns_base - end - - def self.create_dns_request(nodename, servname, hints, data, &callback : LibEvent2::DnsGetAddrinfoCallback) - dns_base.getaddrinfo(nodename, servname, hints, data, &callback) - end end diff --git a/src/crystal/fiber_channel.cr b/src/crystal/fiber_channel.cr new file mode 100644 index 000000000000..dbe0cc6187b9 --- /dev/null +++ b/src/crystal/fiber_channel.cr @@ -0,0 +1,23 @@ +# :nodoc: +# +# This struct wraps around a IO pipe to send and receive fibers between +# worker threads. The receiving thread will hang on listening for new fibers +# or fibers that become runnable by the execution of other threads, at the same +# time it waits for other IO events or timers within the event loop +struct Crystal::FiberChannel + @worker_in : IO::FileDescriptor + @worker_out : IO::FileDescriptor + + def initialize + @worker_out, @worker_in = IO.pipe + end + + def send(fiber : Fiber) + @worker_in.write_bytes(fiber.object_id) + end + + def receive + oid = @worker_out.read_bytes(UInt64) + Pointer(Fiber).new(oid).as(Fiber) + end +end diff --git a/src/crystal/lib_event2.cr b/src/crystal/lib_event2.cr index 94cd7bf3550b..f898c528bf27 100644 --- a/src/crystal/lib_event2.cr +++ b/src/crystal/lib_event2.cr @@ -10,6 +10,9 @@ require "c/netdb" {% else %} @[Link("event")] {% end %} +{% if flag?(:preview_mt) %} + @[Link("event_pthreads")] +{% end %} lib LibEvent2 alias Int = LibC::Int @@ -65,4 +68,8 @@ lib LibEvent2 fun evdns_getaddrinfo(base : DnsBase, nodename : UInt8*, servname : UInt8*, hints : LibC::Addrinfo*, cb : DnsGetAddrinfoCallback, arg : Void*) : DnsGetAddrinfoRequest fun evdns_getaddrinfo_cancel(DnsGetAddrinfoRequest) fun evutil_freeaddrinfo(ai : LibC::Addrinfo*) + + {% if flag?(:preview_mt) %} + fun evthread_use_pthreads : Int + {% end %} end diff --git a/src/crystal/once.cr b/src/crystal/once.cr new file mode 100644 index 000000000000..db2c1aaeecde --- /dev/null +++ b/src/crystal/once.cr @@ -0,0 +1,27 @@ +{% if flag?(:preview_mt) %} + fun __crystal_once_init : Void* + Mutex.new.as(Void*) + end + + fun __crystal_once(m : Void*, f : Bool*, init : Void*) + unless f.value + m.as(Mutex).synchronize do + unless f.value + Proc(Nil).new(init, Pointer(Void).null).call + f.value = true + end + end + end + end +{% else %} + fun __crystal_once_init : Void* + Pointer(Void).null + end + + fun __crystal_once(m : Void*, f : Bool*, init : Void*) + unless f.value + Proc(Nil).new(init, Pointer(Void).null).call + f.value = true + end + end +{% end %} diff --git a/src/crystal/rw_lock.cr b/src/crystal/rw_lock.cr new file mode 100644 index 000000000000..de61fd31be14 --- /dev/null +++ b/src/crystal/rw_lock.cr @@ -0,0 +1,37 @@ +# :nodoc: +class Crystal::RWLock + @writer = Atomic(Int32).new(0) + @readers = Atomic(Int32).new(0) + + def read_lock + loop do + while @writer.get != 0 + Intrinsics.pause + end + + @readers.add(1) + + break if @writer.get == 0 + + @readers.sub(1) + end + end + + def read_unlock + @readers.sub(1) + end + + def write_lock + while @writer.swap(1) != 0 + Intrinsics.pause + end + + while @readers.get != 0 + Intrinsics.pause + end + end + + def write_unlock + @writer.lazy_set(0) + end +end diff --git a/src/crystal/scheduler.cr b/src/crystal/scheduler.cr index 2cf8086d5512..108416616991 100644 --- a/src/crystal/scheduler.cr +++ b/src/crystal/scheduler.cr @@ -1,4 +1,5 @@ require "./event_loop" +require "./fiber_channel" require "fiber" require "thread" @@ -15,11 +16,27 @@ class Crystal::Scheduler end def self.enqueue(fiber : Fiber) : Nil - Thread.current.scheduler.enqueue(fiber) + {% if flag?(:preview_mt) %} + th = fiber.@current_thread.lazy_get + + if th.nil? + th = Thread.current.scheduler.find_target_thread + end + + if th == Thread.current + Thread.current.scheduler.enqueue(fiber) + else + th.scheduler.send_fiber(fiber) + end + {% else %} + Thread.current.scheduler.enqueue(fiber) + {% end %} end def self.enqueue(fibers : Enumerable(Fiber)) : Nil - Thread.current.scheduler.enqueue(fibers) + fibers.each do |fiber| + enqueue(fiber) + end end def self.reschedule : Nil @@ -42,6 +59,12 @@ class Crystal::Scheduler Thread.current.scheduler.yield(fiber) end + {% if flag?(:preview_mt) %} + @fiber_channel = Crystal::FiberChannel.new + {% end %} + @lock = Crystal::SpinLock.new + @sleeping = false + # :nodoc: def initialize(@main : Fiber) @current = @main @@ -49,17 +72,17 @@ class Crystal::Scheduler end protected def enqueue(fiber : Fiber) : Nil - @runnables << fiber + @lock.sync { @runnables << fiber } end protected def enqueue(fibers : Enumerable(Fiber)) : Nil - @runnables.concat fibers + @lock.sync { @runnables.concat fibers } end protected def resume(fiber : Fiber) : Nil validate_resumable(fiber) {% if flag?(:preview_mt) %} - ensure_single_resume(fiber) + set_current_thread(fiber) GC.lock_read {% else %} GC.set_stackbottom(fiber.@stack_bottom) @@ -83,17 +106,8 @@ class Crystal::Scheduler end end - private def ensure_single_resume(fiber) - # Set the current thread as the running thread of *fiber*, - # but only if *fiber.@current_thread* is effectively *nil* - if fiber.@current_thread.compare_and_set(nil, Thread.current).last - # the current fiber will leave the current thread shortly - # although it's not resumable yet we need to clear - # *@current.@current_thread* for a future `Scheduler.resume(@current)` - @current.@current_thread.set(nil) - else - fatal_resume_error(fiber, "tried to resume the same fiber multiple times") - end + private def set_current_thread(fiber) + fiber.@current_thread.set(Thread.current) end private def fatal_resume_error(fiber, message) @@ -103,8 +117,10 @@ class Crystal::Scheduler end protected def reschedule : Nil - if runnable = @runnables.shift? - runnable.resume + if runnable = @lock.sync { @runnables.shift? } + unless runnable == Fiber.current + runnable.resume + end else Crystal::EventLoop.resume end @@ -123,4 +139,79 @@ class Crystal::Scheduler @current.resume_event.add(0.seconds) resume(fiber) end + + {% if flag?(:preview_mt) %} + @rr_target = 0 + + protected def find_target_thread + if workers = @@workers + @rr_target += 1 + workers[@rr_target % workers.size] + else + Thread.current + end + end + + def run_loop + loop do + @lock.lock + if runnable = @runnables.shift? + @runnables << Fiber.current + @lock.unlock + runnable.resume + else + @sleeping = true + @lock.unlock + fiber = @fiber_channel.receive + + @lock.lock + @sleeping = false + @runnables << Fiber.current + @lock.unlock + fiber.resume + end + end + end + + def send_fiber(fiber : Fiber) + @lock.lock + if @sleeping + @fiber_channel.send(fiber) + else + @runnables << fiber + end + @lock.unlock + end + + def self.init_workers + @@workers = Array(Thread).new(worker_count) do |i| + if i == 0 + worker_loop = Fiber.new(name: "Worker Loop") { Thread.current.scheduler.run_loop } + Thread.current.scheduler.enqueue worker_loop + Thread.current + else + Thread.new { Thread.current.scheduler.run_loop } + end + end + end + + private def self.worker_count + env_workers = ENV["CRYSTAL_WORKERS"]? + + if env_workers && !env_workers.empty? + workers = env_workers.to_i? + if !workers || workers < 1 + LibC.dprintf 2, "FATAL: Invalid value for CRYSTAL_WORKERS: #{env_workers}\n" + exit 1 + end + + workers + else + # TODO: default worker count, currenlty hardcoded to 4 that seems to be something + # that is benefitial for many scenarios without adding too much contention. + # In the future we could use the number of cores or something associated to it. + 4 + end + end + {% end %} end diff --git a/src/crystal/spin_lock.cr b/src/crystal/spin_lock.cr new file mode 100644 index 000000000000..c5939f884d06 --- /dev/null +++ b/src/crystal/spin_lock.cr @@ -0,0 +1,40 @@ +# :nodoc: +class Crystal::SpinLock + {% if flag?(:preview_mt) %} + @m = Atomic(Int32).new(0) + {% end %} + + def lock + {% if flag?(:preview_mt) %} + while @m.swap(1) == 1 + while @m.get == 1 + Intrinsics.pause + end + end + {% end %} + end + + def unlock + {% if flag?(:preview_mt) %} + @m.lazy_set(0) + {% end %} + end + + def sync + lock + begin + yield + ensure + unlock + end + end + + def unsync + unlock + begin + yield + ensure + lock + end + end +end diff --git a/src/crystal/thread_local_value.cr b/src/crystal/thread_local_value.cr new file mode 100644 index 000000000000..a21626f93038 --- /dev/null +++ b/src/crystal/thread_local_value.cr @@ -0,0 +1,27 @@ +# :nodoc: +struct Crystal::ThreadLocalValue(T) + @values = Hash(Thread, T).new + + def get(&block : -> T) + th = Thread.current + @values.fetch(th) do + @values[th] = yield + end + end + + def get? + @values[Thread.current]? + end + + def set(value : T) + @values[Thread.current] = value + end + + def each + @values.each_value { |t| yield t } + end + + def clear + @values.clear + end +end diff --git a/src/deque.cr b/src/deque.cr index 986045847148..e1bbba548c3d 100644 --- a/src/deque.cr +++ b/src/deque.cr @@ -187,10 +187,14 @@ class Deque(T) # a # => Deque{"a", "c"} # ``` def delete(obj) + delete_if { |i| i == obj } + end + + def delete_if found = false i = 0 while i < @size - if self[i] == obj + if yield self[i] delete_at(i) found = true else diff --git a/src/fiber.cr b/src/fiber.cr index fdc1ff010316..4312b71b2c15 100644 --- a/src/fiber.cr +++ b/src/fiber.cr @@ -11,7 +11,8 @@ fun _fiber_get_stack_top : Void* end class Fiber - @@fibers = Thread::LinkedList(Fiber).new + # :nodoc: + protected class_getter(fibers) { Thread::LinkedList(Fiber).new } # :nodoc: class_getter stack_pool = StackPool.new @@ -32,12 +33,12 @@ class Fiber # :nodoc: def self.inactive(fiber : Fiber) - @@fibers.delete(fiber) + fibers.delete(fiber) end # :nodoc: def self.unsafe_each - @@fibers.unsafe_each { |fiber| yield fiber } + fibers.unsafe_each { |fiber| yield fiber } end def initialize(@name : String? = nil, &@proc : ->) @@ -55,7 +56,7 @@ class Fiber makecontext(stack_ptr, fiber_main) - @@fibers.push(self) + Fiber.fibers.push(self) end # :nodoc: @@ -65,7 +66,7 @@ class Fiber @stack_bottom = GC.current_thread_stack_bottom @name = "main" @current_thread.set(thread) - @@fibers.push(self) + Fiber.fibers.push(self) end # :nodoc: @@ -84,7 +85,7 @@ class Fiber Fiber.stack_pool.release(@stack) # Remove the current fiber from the linked list - @@fibers.delete(self) + Fiber.fibers.delete(self) # Delete the resume event if it was used by `yield` or `sleep` @resume_event.try &.free @@ -119,6 +120,10 @@ class Fiber Crystal::Scheduler.resume(self) end + def enqueue + Crystal::Scheduler.enqueue(self) + end + # :nodoc: def resume_event @resume_event ||= Crystal::EventLoop.create_resume_event(self) diff --git a/src/fiber/stack_pool.cr b/src/fiber/stack_pool.cr index f9e256899cc0..718c2082b3c4 100644 --- a/src/fiber/stack_pool.cr +++ b/src/fiber/stack_pool.cr @@ -5,13 +5,14 @@ class Fiber def initialize @deque = Deque(Void*).new + @mutex = Thread::Mutex.new end # Removes and frees at most *count* stacks from the top of the pool, # returning memory to the operating system. def collect(count = lazy_size // 2) count.times do - if stack = @deque.shift? + if stack = @mutex.synchronize { @deque.shift? } LibC.munmap(stack, STACK_SIZE) else return @@ -21,19 +22,19 @@ class Fiber # Removes a stack from the bottom of the pool, or allocates a new one. def checkout - stack = @deque.pop? || allocate + stack = @mutex.synchronize { @deque.pop? } || allocate {stack, stack + STACK_SIZE} end # Appends a stack to the bottom of the pool. def release(stack) - @deque.push(stack) + @mutex.synchronize { @deque.push(stack) } end # Returns the approximated size of the pool. It may be equal or slightly # bigger or smaller than the actual size. def lazy_size - @deque.size + @mutex.synchronize { @deque.size } end private def allocate diff --git a/src/gc/boehm.cr b/src/gc/boehm.cr index 782670225332..a0dd1c63c020 100644 --- a/src/gc/boehm.cr +++ b/src/gc/boehm.cr @@ -1,3 +1,7 @@ +{% if flag?(:preview_mt) %} + require "crystal/rw_lock" +{% end %} + {% unless flag?(:win32) %} @[Link("pthread")] {% end %} @@ -71,6 +75,10 @@ lib LibGC end module GC + {% if flag?(:preview_mt) %} + @@lock = Crystal::RWLock.new + {% end %} + # :nodoc: def self.malloc(size : LibC::SizeT) : Void* LibGC.malloc(size) @@ -204,23 +212,29 @@ module GC # :nodoc: def self.lock_read {% if flag?(:preview_mt) %} - GC.disable + @@lock.read_lock {% end %} end # :nodoc: def self.unlock_read {% if flag?(:preview_mt) %} - GC.enable + @@lock.read_unlock {% end %} end # :nodoc: def self.lock_write + {% if flag?(:preview_mt) %} + @@lock.write_lock + {% end %} end # :nodoc: def self.unlock_write + {% if flag?(:preview_mt) %} + @@lock.write_unlock + {% end %} end # :nodoc: @@ -247,8 +261,10 @@ module GC {% if flag?(:preview_mt) %} Thread.unsafe_each do |thread| - fiber = thread.scheduler.@current - GC.set_stackbottom(thread, fiber.@stack_bottom) + if scheduler = thread.@scheduler + fiber = scheduler.@current + GC.set_stackbottom(thread, fiber.@stack_bottom) + end end {% end %} diff --git a/src/intrinsics.cr b/src/intrinsics.cr index e9bdd490c94f..7c491648951e 100644 --- a/src/intrinsics.cr +++ b/src/intrinsics.cr @@ -47,6 +47,10 @@ lib LibIntrinsics fun va_start = "llvm.va_start"(ap : Void*) fun va_end = "llvm.va_end"(ap : Void*) + + {% if flag?(:i686) || flag?(:x86_64) %} + fun pause = "llvm.x86.sse2.pause" + {% end %} end module Intrinsics @@ -54,6 +58,12 @@ module Intrinsics LibIntrinsics.debugtrap end + def self.pause + {% if flag?(:i686) || flag?(:x86_64) %} + LibIntrinsics.pause + {% end %} + end + macro memcpy(dest, src, len, is_volatile) {% if compare_versions(Crystal::LLVM_VERSION, "7.0.0") < 0 %} LibIntrinsics.memcpy({{dest}}, {{src}}, {{len}}, 0, {{is_volatile}}) diff --git a/src/io/evented.cr b/src/io/evented.cr index deca61cd8793..08436a8df0f1 100644 --- a/src/io/evented.cr +++ b/src/io/evented.cr @@ -1,5 +1,7 @@ {% skip_file if flag?(:win32) %} +require "crystal/thread_local_value" + module IO::Evented @read_timed_out = false @write_timed_out = false @@ -7,11 +9,11 @@ module IO::Evented @read_timeout : Time::Span? @write_timeout : Time::Span? - @readers : Deque(Fiber)? - @writers : Deque(Fiber)? + @readers = Crystal::ThreadLocalValue(Deque(Fiber)).new + @writers = Crystal::ThreadLocalValue(Deque(Fiber)).new - @read_event : Crystal::Event? - @write_event : Crystal::Event? + @read_event = Crystal::ThreadLocalValue(Crystal::Event).new + @write_event = Crystal::ThreadLocalValue(Crystal::Event).new # Returns the time to wait when reading before raising an `IO::Timeout`. def read_timeout : Time::Span? @@ -98,7 +100,7 @@ module IO::Evented def resume_read(timed_out = false) @read_timed_out = timed_out - if reader = @readers.try &.shift? + if reader = @readers.get?.try &.shift? Crystal::Scheduler.enqueue reader end end @@ -107,7 +109,7 @@ module IO::Evented def resume_write(timed_out = false) @write_timed_out = timed_out - if writer = @writers.try &.shift? + if writer = @writers.get?.try &.shift? Crystal::Scheduler.enqueue writer end end @@ -117,7 +119,7 @@ module IO::Evented end protected def wait_readable(timeout = @read_timeout) : Nil - readers = (@readers ||= Deque(Fiber).new) + readers = @readers.get { Deque(Fiber).new } readers << Fiber.current add_read_event(timeout) Crystal::Scheduler.reschedule @@ -129,7 +131,7 @@ module IO::Evented end private def add_read_event(timeout = @read_timeout) : Nil - event = @read_event ||= Crystal::EventLoop.create_fd_read_event(self) + event = @read_event.get { Crystal::EventLoop.create_fd_read_event(self) } event.add timeout end @@ -138,7 +140,7 @@ module IO::Evented end protected def wait_writable(timeout = @write_timeout) : Nil - writers = (@writers ||= Deque(Fiber).new) + writers = @writers.get { Deque(Fiber).new } writers << Fiber.current add_write_event(timeout) Crystal::Scheduler.reschedule @@ -150,7 +152,7 @@ module IO::Evented end private def add_write_event(timeout = @write_timeout) : Nil - event = @write_event ||= Crystal::EventLoop.create_fd_write_event(self) + event = @write_event.get { Crystal::EventLoop.create_fd_write_event(self) } event.add timeout end @@ -159,31 +161,31 @@ module IO::Evented end def evented_close - @read_event.try &.free - @read_event = nil + @read_event.each &.free + @read_event.clear - @write_event.try &.free - @write_event = nil + @write_event.each &.free + @write_event.clear - if readers = @readers + @readers.each do |readers| Crystal::Scheduler.enqueue readers - readers.clear end + @readers.clear - if writers = @writers + @writers.each do |writers| Crystal::Scheduler.enqueue writers - writers.clear end + @writers.clear end private def resume_pending_readers - if (readers = @readers) && !readers.empty? + if (readers = @readers.get?) && !readers.empty? add_read_event end end private def resume_pending_writers - if (writers = @writers) && !writers.empty? + if (writers = @writers.get?) && !writers.empty? add_write_event end end diff --git a/src/kernel.cr b/src/kernel.cr index 3bf1765e56f2..302b73f6e0d6 100644 --- a/src/kernel.cr +++ b/src/kernel.cr @@ -520,26 +520,28 @@ def abort(message, status = 1) : NoReturn exit status end -class Process - # Hooks are defined here due to load order problems. - def self.after_fork_child_callbacks - @@after_fork_child_callbacks ||= [ - # clean ups (don't depend on event loop): - ->Crystal::Signal.after_fork, - ->Crystal::SignalChildHandler.after_fork, - - # reinit event loop: - ->Crystal::EventLoop.after_fork, - - # more clean ups (may depend on event loop): - ->Random::DEFAULT.new_seed, - ] of -> Nil +{% unless flag?(:preview_mt) %} + class Process + # Hooks are defined here due to load order problems. + def self.after_fork_child_callbacks + @@after_fork_child_callbacks ||= [ + # clean ups (don't depend on event loop): + ->Crystal::Signal.after_fork, + ->Crystal::SignalChildHandler.after_fork, + + # reinit event loop: + ->Crystal::EventLoop.after_fork, + + # more clean ups (may depend on event loop): + ->Random::DEFAULT.new_seed, + ] of -> Nil + end end -end +{% end %} {% unless flag?(:win32) %} # Background loop to cleanup unused fiber stacks. - spawn do + spawn(name: "Fiber Clean Loop") do loop do sleep 5 Fiber.stack_pool.collect @@ -549,3 +551,7 @@ end Signal.setup_default_handlers LibExt.setup_sigfault_handler {% end %} + +{% if flag?(:preview_mt) %} + Crystal::Scheduler.init_workers +{% end %} diff --git a/src/llvm/value_methods.cr b/src/llvm/value_methods.cr index 1e8d2f9664b3..efba6024b4ba 100644 --- a/src/llvm/value_methods.cr +++ b/src/llvm/value_methods.cr @@ -96,7 +96,7 @@ module LLVM::ValueMethods end def to_value - LLVM::Value.new unwrap + LLVM::Value.new @unwrap end def dump diff --git a/src/mutex.cr b/src/mutex.cr index bfdf45e55f61..b26b4633b8d5 100644 --- a/src/mutex.cr +++ b/src/mutex.cr @@ -1,24 +1,26 @@ +require "crystal/spin_lock" + # A fiber-safe mutex. -# -# TODO: this isn't thread-safe yet. class Mutex @mutex_fiber : Fiber? - - def initialize - @lock_count = 0 - end + @lock_count = 0 + @queue = Deque(Fiber).new + @lock = Crystal::SpinLock.new def lock + @lock.lock mutex_fiber = @mutex_fiber current_fiber = Fiber.current if !mutex_fiber @mutex_fiber = current_fiber + @lock.unlock elsif mutex_fiber == current_fiber @lock_count += 1 # recursive lock + @lock.unlock else - queue = @queue ||= Deque(Fiber).new - queue << current_fiber + @queue << current_fiber + @lock.unlock Crystal::Scheduler.reschedule end @@ -26,20 +28,26 @@ class Mutex end def unlock - unless @mutex_fiber == Fiber.current - raise "Attempt to unlock a mutex which is not locked" - end + @lock.lock - if @lock_count > 0 - @lock_count -= 1 - return - end + begin + unless @mutex_fiber == Fiber.current + raise "Attempt to unlock a mutex which is not locked" + end - if fiber = @queue.try &.shift? - @mutex_fiber = fiber - Crystal::Scheduler.enqueue fiber - else - @mutex_fiber = nil + if @lock_count > 0 + @lock_count -= 1 + return + end + + if fiber = @queue.try &.shift? + @mutex_fiber = fiber + fiber.enqueue + else + @mutex_fiber = nil + end + ensure + @lock.unlock end nil diff --git a/src/prelude.cr b/src/prelude.cr index 777b85adb520..6b74c77d4eb2 100644 --- a/src/prelude.cr +++ b/src/prelude.cr @@ -14,6 +14,7 @@ private macro no_win(stmt) end # This list requires ordered statements +require "crystal/once" require "lib_c" require "macros" require "object" diff --git a/src/process.cr b/src/process.cr index f5e08861cc73..208c6c253ae1 100644 --- a/src/process.cr +++ b/src/process.cr @@ -77,6 +77,8 @@ class Process # Runs the given block inside a new process and # returns a `Process` representing the new child process. def self.fork : Process + {% raise("Process fork is unsupported with multithread mode") if flag?(:preview_mt) %} + if pid = fork_internal(will_exec: false) new pid else @@ -97,6 +99,8 @@ class Process # Returns a `Process` representing the new child process in the current process # and `nil` inside the new child process. def self.fork : Process? + {% raise("Process fork is unsupported with multithread mode") if flag?(:preview_mt) %} + if pid = fork_internal(will_exec: false) new pid else @@ -123,7 +127,9 @@ class Process LibC.sigemptyset(pointerof(newmask)) LibC.pthread_sigmask(LibC::SIG_SETMASK, pointerof(newmask), nil) else - Process.after_fork_child_callbacks.each(&.call) + {% unless flag?(:preview_mt) %} + Process.after_fork_child_callbacks.each(&.call) + {% end %} LibC.pthread_sigmask(LibC::SIG_SETMASK, pointerof(oldmask), nil) end when -1 @@ -228,7 +234,7 @@ class Process # A pipe to this process's error. Raises if a pipe wasn't asked when creating the process. getter! error : IO::FileDescriptor - @waitpid : Channel::Buffered(Int32) + @waitpid : Channel(Int32) @wait_count = 0 # Creates a process, executes it, but doesn't wait for it to complete. @@ -295,11 +301,13 @@ class Process fork_io, process_io = IO.pipe(read_blocking: true) @wait_count += 1 + ensure_channel spawn { copy_io(stdio, process_io, channel, close_dst: true) } else process_io, fork_io = IO.pipe(write_blocking: true) @wait_count += 1 + ensure_channel spawn { copy_io(process_io, stdio, channel, close_src: true) } end @@ -399,6 +407,14 @@ class Process end private def channel + if channel = @channel + channel + else + raise "BUG: Notification channel was not initialized for this process" + end + end + + private def ensure_channel @channel ||= Channel(Exception?).new end diff --git a/src/signal.cr b/src/signal.cr index b06e0c409fce..f4c094f25645 100644 --- a/src/signal.cr +++ b/src/signal.cr @@ -192,7 +192,7 @@ module Crystal::Signal end def self.start_loop - spawn do + spawn(name: "Signal Loop") do loop do value = reader.read_bytes(Int32) rescue Errno @@ -269,11 +269,11 @@ module Crystal::SignalChildHandler # child process exited. @@pending = {} of LibC::PidT => Int32 - @@waiting = {} of LibC::PidT => Channel::Buffered(Int32) + @@waiting = {} of LibC::PidT => Channel(Int32) @@mutex = Mutex.new - def self.wait(pid : LibC::PidT) : Channel::Buffered(Int32) - channel = Channel::Buffered(Int32).new(1) + def self.wait(pid : LibC::PidT) : Channel(Int32) + channel = Channel(Int32).new(1) @@mutex.lock if exit_code = @@pending.delete(pid) diff --git a/src/thread.cr b/src/thread.cr index 3162144137cb..f2eeb6589e1f 100644 --- a/src/thread.cr +++ b/src/thread.cr @@ -8,7 +8,7 @@ class Thread # Use spawn and channels instead. # all thread objects, so the GC can see them (it doesn't scan thread locals) - @@threads = Thread::LinkedList(Thread).new + protected class_getter(threads) { Thread::LinkedList(Thread).new } @th : LibC::PthreadT @exception : Exception? @@ -22,7 +22,7 @@ class Thread property previous : Thread? def self.unsafe_each - @@threads.unsafe_each { |thread| yield thread } + threads.unsafe_each { |thread| yield thread } end # Starts a new system thread. @@ -35,7 +35,7 @@ class Thread }, self.as(Void*)) if ret == 0 - @@threads.push(self) + Thread.threads.push(self) else raise Errno.new("pthread_create", ret) end @@ -48,7 +48,7 @@ class Thread @th = LibC.pthread_self @main_fiber = Fiber.new(stack_address, self) - @@threads.push(self) + Thread.threads.push(self) end private def detach @@ -98,7 +98,9 @@ class Thread # Returns the Thread object associated to the running system thread. def self.current : Thread - @@current || raise "BUG: Thread.current returned NULL" + # Thread#start sets @@current as soon it starts. Thus we know + # that if @@current is not set then we are in the main thread + @@current ||= new end # Associates the Thread object to the running system thread. @@ -106,12 +108,6 @@ class Thread end {% end %} - # Create the thread object for the current thread (aka the main thread of the - # process). - # - # TODO: consider moving to `kernel.cr` or `crystal/main.cr` - self.current = new - def self.yield ret = LibC.sched_yield raise Errno.new("sched_yield") unless ret == 0 @@ -136,7 +132,7 @@ class Thread rescue ex @exception = ex ensure - @@threads.delete(self) + Thread.threads.delete(self) Fiber.inactive(fiber) detach { GC.pthread_detach(@th) } end diff --git a/src/thread/linked_list.cr b/src/thread/linked_list.cr index f64d9df95bba..2e66345cf2d5 100644 --- a/src/thread/linked_list.cr +++ b/src/thread/linked_list.cr @@ -6,7 +6,7 @@ class Thread # # Thread-safe doubly linked list of `T` objects that must implement # `#previous : T?` and `#next : T?` methods. - struct LinkedList(T) + class LinkedList(T) @mutex = Thread::Mutex.new @head : T? @tail : T? diff --git a/src/time.cr b/src/time.cr index ace6c121623c..e2201fe5f34d 100644 --- a/src/time.cr +++ b/src/time.cr @@ -268,7 +268,7 @@ struct Time # ``` # Time.utc - Time::UNIX_EPOCH # ``` - UNIX_EPOCH = utc(1970, 1, 1) + UNIX_EPOCH = new(unsafe_utc_seconds: 62135596800) # :nodoc: MAX_SECONDS = 315537897599_i64 @@ -487,6 +487,13 @@ struct Time end end + # :nodoc: + protected def initialize(*, unsafe_utc_seconds : Int64) + @seconds = unsafe_utc_seconds + @nanoseconds = 0 + @location = Location::UTC + end + # Creates a new `Time` instance that corresponds to the number of *seconds* # and *nanoseconds* elapsed from epoch (`0001-01-01 00:00:00.0 UTC`) # in UTC.