Skip to content

elbywan/cryomongo

Repository files navigation

cryomongo

A MongoDB driver written in pure Crystal.

Build Status GitHub tag (latest SemVer) GitHub

Cryomongo is a high-performance MongoDB driver written in pure Crystal. (i.e. no C dependencies needed.)

Compatible with MongoDB 3.6+. Tested against: 4.2.

⚠️ BETA state.

If you are looking for a higher-level object-document mapper library, you might want to check out the moongoon shard.

Installation

  1. Add the dependency to your shard.yml:
dependencies:
  cryomongo:
    github: elbywan/cryomongo
  1. Run shards install

Usage

Minimal working example

require "cryomongo"

# Create a Mongo client, using a standard mongodb connection string.
client = Mongo::Client.new # defaults to: "mongodb://localhost:27017"

# Get database and collection.
database = client["database_name"]
collection = database["collection_name"]

# Perform crud operations.
collection.insert_one({ one: 1 })
collection.replace_one({ one: 1 }, { two: 2 })
bson = collection.find_one({ two: 2 })
puts bson.not_nil!.["two"] # => 2
collection.delete_one({ two: 2 })
puts collection.count_documents # => 0

Complex example with serialization

require "cryomongo"

# We take advantage of the BSON serialization capabilities provided by the `bson.cr` shard.
record User,
  name : String,
  banned : Bool? = false,
  _id : BSON::ObjectId = BSON::ObjectId.new,
  creation_date : Time = Time.utc do
  include BSON::Serializable
  include JSON::Serializable
end

# Initialize Client, Database and Collection.
client = Mongo::Client.new
database = client["database"]
users = database["users"]

# We set majority read and write at the Database level.
database.read_concern = Mongo::ReadConcern.new(level: "majority")
database.write_concern = Mongo::WriteConcern.new(w: "majority")

# Drop and recreate the Collection to ensure that we read later only the documents we inserted in this example.
{ Mongo::Commands::Drop, Mongo::Commands::Create }.each do |command|
  database.command(command, name: "users")
rescue e : Mongo::Error::Command
  # ignore the server error, drop will fail if the collection has not been created before.
end

# Insert User structures that are automatically serialized to BSON.
users.insert_many({ "John", "Jane" }.map { |name|
  User.new(name: name)
}.to_a)

# Fetch a Cursor pointing to the users collection.
cursor = users.find

# Iterate the cursor and use `.of(User)` to deserialize as the cursor gets iterated.
# Then push the users into an array that gets pretty printed.
puts cursor.of(User).to_a.to_pretty_json
# => [
#   {
#     "name": "John",
#     "banned": false,
#     "_id": {
#       "$oid": "f2001c5fb0a33e0264e2ea05"
#     },
#     "creation_date": "2020-07-25T09:52:50Z"
#   },
#   {
#     "name": "Jane",
#     "banned": false,
#     "_id": {
#       "$oid": "f2001c5fb0a33e0264e2ea07"
#     },
#     "creation_date": "2020-07-25T09:52:50Z"
#   }
# ]

Features

Conventions

Documentation

The generated API documentation is available here.

Connection

require "cryomongo"

# Mongo::Client is the root object for interacting with a MongoDB deployment.
# It is responsible for monitoring the cluster, routing the requests and managing the socket pools.

# A client can be instantiated using a standard mongodb connection string.

# Client options can be passed as query parameters…
client = Mongo::Client.new("mongodb://address:port/database?appname=MyApp")
# …or with a Mongo::Options instance…
options = Mongo::Options.new(appname: "MyApp")
client = Mongo::Client.new("mongodb://address:port/database", options)
# …or both.

# Instantiate objects to interact with a specific database or a collection…
database   = client["database_name"]
collection = database["collection_name"]
# …or using `default_database` if the connection uri string contains a default auth database component ("/database").
database   = client.default_database
collection = database.not_nil!.collection["collection_name"]

# The overwhelming majority of programs should use a single client and should not bother with closing clients.
# Otherwise, to free the underlying resources a client must be manually closed.
client.close
# To enable SSL/TLS, use the `tls` option, alongside the `tlsCAFile` and `tlsCertificateKeyFile` options.
uri = "mongodb://localhost:27017/?tls=true&tlsCAFile=./ca.crt&tlsCertificateKeyFile=./client.pem"
ssl_client = Mongo::Client.new uri

Links

Authentication

Cryomongo only supports the SCRAM-SHA1 and SCRAM-SHA256 authentication methods without SASLprep.

require "cryomongo"

# To use authentication, specify a username and password when passing an URI to the client constructor.
# Authentication methods depend on the server configuration and on the value of the `authMechanism` query parameter.
client = Mongo::Client.new("mongodb://username:password@localhost:27017")

Basic operations

require "cryomongo"

client = Mongo::Client.new

# Most CRUD operations are performed at collection-level.
collection = client["database_name"]["collection_name"]

# The examples below are very basic, but the methods can accept all the options documented in the MongoDB manual.

## Create

# Insert a single document
collection.insert_one({ key: "value" })
# Insert multiple documents
collection.insert_many((1..100).map{|i| { count: i }}.to_a)

# To track the _id, generate and pass it as a property
id = BSON::ObjectId.new
collection.insert_one({ _id: id, key: "value" })

## Read

# Find a single document
document = collection.find_one({ _id: id })
document.try { |d| puts d.to_json }
# Find multiple documents.
cursor = collection.find({ qty: { "$gt": 4 }})
elements = cursor.to_a # cursor is an Iterable(BSON)

## Update

# Replace a single document.
collection.replace_one({ name: "John" }, { name: "Jane" })
# Update a single document.
collection.update_one({ name: "John" }, { "$set": { name: "Jane" }})
# Update multiple documents
collection.update_many({ name: { "$in": ["John", "Jane"] }}, { "$set": { name: "Jules" }})
# Find one document and replace it
document = collection.find_one_and_replace({ name: "John" }, { name: "Jane" })
puts document.try &.["name"]
# Find one document and update it
document = collection.find_one_and_update({ name: "John" }, { "$set": { name: "Jane" }})
puts document.try &.["name"]

## Delete

# Delete one document
collection.delete_one({ age: 20 })
# Delete multiple documents
collection.delete_many({ age: { "$lt": 18 }})
# find_one_and_delete
document = collection.find_one_and_delete({ age: { "$lt": 18 }})
puts document.try &.["age"]

# Aggregate

# Perform an aggregation pipeline query
cursor = collection.aggregate([
  {"$match": { status: "available" }}
  {"$limit": 5},
])
cursor.try &.each { |bson| puts bson.to_json }

# Distinct collection values
values = collection.distinct(
  key: "field",
  filter: { age: { "$gt": 18 }}
)

# Documents count
counter = collection.count_documents({ age: { "$lt": 18 }})

# Estimated count
counter = collection.estimated_document_count

Links

Bulk operations

require "cryomongo"

client = Mongo::Client.new

# A Bulk object can be initialized by calling `.bulk` on a collection.
collection = client["database_name"]["collection_name"]
bulk = collection.bulk
# A bulk is ordered by default.
bulk.ordered? # => true

500.times { |idx|
  # Build the queries by calling bulk methods multiple times.
  bulk.insert_one({number: idx})
  bulk.delete_many({number: {"$lt": 450}})
  bulk.replace_one({ number: idx }, { number: idx + 1})
}

# Execute all the queries and return an aggregated result.
pp bulk.execute(write_concern: Mongo::WriteConcern.new(w: 1))

Links

Indexes

require "cryomongo"

client = Mongo::Client.new
collection = client["database_name"]["collection_name"]

# Create one index without options…
collection.create_index(
  keys: {
    "a":  1,
    "b":  -1,
  }
)
# or with options (snake_cased)…
collection.create_index(
  keys: {
    "a":  1,
    "b":  -1,
  },
  options: {
    unique: true
  }
)
# and optionally specify the name.
collection.create_index(
  keys: {
    "a":  1,
    "b":  -1,
  },
  options: {
    name: "index_name",
  }
)

# Follow the same rules to create multiple indexes with a single method call.
collection.create_indexes([
  {
    keys: { a: 1 }
  },
  {
    keys: { b: 2 }, options: { expire_after_seconds: 3600 }
  }
])

Links

GridFS

require "cryomongo"

client = Mongo::Client.new
database = client["database_name"]

# A GridFS bucket belong to a database.
gridfs = database.grid_fs

# Upload
file = File.new("file.txt")
id = gridfs.upload_from_stream("file.txt", file)
file.close

# Download
stream = IO::Memory.new
gridfs.download_to_stream(id, stream)
puts stream.rewind.gets_to_end

# Find
files = gridfs.find({
  length: {"$gte": 5000},
})
files.each { |file|
  puts file.filename
}

# Delete
gridfs.delete(id)

# And many more methods… (check the link below.)

Links

Change streams

require "cryomongo"

# Change streams can watch a client, database or collection for change.
# This code snippet will focus on watching a single collection.

client = Mongo::Client.new
collection = client["database_name"]["collection_name"]

spawn {
  cursor = collection.watch(
    [
      {"$match": {"operationType": "insert"}},
    ],
    max_await_time_ms: 10000
  )
  # cursor.of(BSON) converts fetched elements to the Mongo::ChangeStream::Document(BSON) type.
  cursor.of(BSON).each { |doc|
    puts doc.document_key
    puts doc.full_document.to_json
  }
}

100.times do |i|
  collection.insert_one({count: i})
end

sleep

Links

Raw commands

require "cryomongo"

# Commands can be run on a client, database or collection depending on the command target.

client = Mongo::Client.new

# Call the `.command` method to run a command against the server.
# The first argument is a `Mongo::Commands` sub-class, followed by the mandatory arguments
# and finally an *options* named tuple containing the optional parameters in snake_case.
result = client.command(Mongo::Commands::ServerStatus, options: {
  repl: 0
})
puts result.to_bson

# The .command method can also be called against a Database…
client["database"].command(Mongo::Commands::Create, name: "collection")
client["database"].command(Mongo::Commands::Drop, name: "collection")
# …or a Collection.
client["database"]["collection"].command(Mongo::Commands::Validate)

Links

Concerns and Preference

require "cryomongo"

# Instantiate Read/Write Concerns and Preference
read_concern = Mongo::ReadConcern.new(level: "majority")
write_concern = Mongo::WriteConcern.new(w: 1, j: true)
read_preference = Mongo::ReadPreference.new(mode: "primary")

# They can be set at the client, database or client level…
client = Mongo::Client.new
database = client["database_name"]
collection = database["collection_name"]

client.read_concern = read_concern
database.write_concern = write_concern
collection.read_preference = read_preference

# …or by passing an extra argument when calling a method.
collection.find(
  filter: { key: "value" },
  read_concern:  Mongo::ReadConcern.new(level: "local"),
  read_preference: Mongo::ReadPreference.new(mode: "secondary")
)

Links

Commands Monitoring

require "cryomongo"

client = Mongo::Client.new

# A simple logging subscriber.

subscription = client.subscribe_commands { |event|
  case event
  when Mongo::Monitoring::Commands::CommandStartedEvent
    Log.info { "COMMAND.#{event.command_name} #{event.address} STARTED: #{event.command.to_json}" }
  when Mongo::Monitoring::Commands::CommandSucceededEvent
    Log.info { "COMMAND.#{event.command_name} #{event.address} COMPLETED: #{event.reply.to_json} (#{event.duration}s)" }
  when Mongo::Monitoring::Commands::CommandFailedEvent
    Log.info { "COMMAND.#{event.command_name} #{event.address} FAILED: #{event.failure.inspect} (#{event.duration}s)" }
  end
}

# Make some queries…
client["database_name"]["collection_name"].find({ hello: "world" })

# …and eventually at some point, unsubscribe the logger.
client.unsubscribe_commands(subscription)

Links

Causal Consistency

require "cryomongo"

client = Mongo::Client.new
# It is important to ensure that both read and writes are performed with "majority" concern.
# See: https://docs.mongodb.com/manual/core/causal-consistency-read-write-concerns/
client.read_concern = Mongo::ReadConcern.new(level: "majority")
client.write_concern = Mongo::WriteConcern.new(w: "majority")

# Reusing the original Mongodb example.
# See: https://docs.mongodb.com/manual/core/read-isolation-consistency-recency/#examples

current_date = Time.utc
items = client["test"]["items"]

# MongoDB enables causal consistency in client sessions by default.
# This is the block syntax that creates, ends and pass the session to collection methods automatically.
items_collection.with_session do |items|
  # Using a causally consistent session ensures that the update occurs before the insert.
  items.update_one(
    { sku: "111", end: { "$exists": false } },
    { "$set": { end: current_date }}
  )
  items.insert_one({ sku: "nuts-111", name: "Pecans", start: current_date })
  puts items.find.to_a.to_pretty_json
end

client.close

Links

Transactions

require "cryomongo"

# Initialize Client and Database instances.
client = Mongo::Client.new
database = client["db"]
collection = database["collection"]

# Create the collection.
{Mongo::Commands::Drop, Mongo::Commands::Create}.each do |command|
  database.command(command, name: "collection")
rescue e : Mongo::Error::Command
  # ignore the server error, drop will fail if the collection has not been created before.
end

# Set read and write concerns to perform isolated transactions.
# See: https://docs.mongodb.com/master/core/transactions/#transactions-and-sessions
transaction_options = Mongo::Session::TransactionOptions.new(
  read_concern: Mongo::ReadConcern.new(level: "snapshot"),
  write_concern: Mongo::WriteConcern.new(w: "majority")
)

# There are two ways to perform transactions:

collection.with_session(default_transaction_options: transaction_options) do |collection, session|
  puts collection.find.to_a.to_json # => "[]"

  # 1. by calling the `with_transaction` method.

  # `with_transaction` will commit after the block ends.
  # if the block raises, the transaction will be aborted.
  session.with_transaction {
    collection.insert_one({_id: 1})
    collection.insert_one({_id: 2})
  }
  puts collection.find.to_a.to_json # => [{"_id":1},{"_id":2}]

  # The transaction below will be aborted because the block raises an Exception.
  begin
    session.with_transaction {
      collection.insert_one({_id: 3})
      raise "Interrupted!"
      collection.insert_one({_id: 4})
    }
  rescue e
    puts e # => Interrupted!
  end
  puts collection.find.to_a.to_json # => [{"_id":1},{"_id":2}]

  # 2. by calling the `start_transaction`, `commit_transaction` and `abort_transaction` methods.
  session.start_transaction
  collection.insert_one({_id: 3})
  # The transaction is isolated, reading outside of the session scope does not return documents impacted by the transaction…
  puts database["collection"].find.to_a.to_json # => [{"_id":1},{"_id":2}]
  # but reading within the session scope does.
  puts collection.find.to_a.to_json # => [{"_id":1},{"_id":2},{"_id":3}]
  session.commit_transaction
  # The transaction is now committed and visible outside of the transaction scope.
  puts collection.find.to_a.to_json             # => [{"_id":1},{"_id":2},{"_id":3}]
  puts database["collection"].find.to_a.to_json # => [{"_id":1},{"_id":2},{"_id":3}]
end

Links

Specifications

The goal is to to be compliant with most of the official MongoDB set of specifications.

✅ Implemented

The following specifications are implemented:

⏳Next

The following specifications are to be implemented next:

Contributing

  1. Fork it (https://github.com/elbywan/cryomongo/fork)
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create a new Pull Request

Contributors

Credit