全文共 2014 个字

调用堆栈

    io.vertx.ext.mongo.impl.MongoClientImpl;

    io.vertx.ext.mongo.impl.MongoHolder;

    io.vertx.ext.mongo.impl.config.MongoClientOptionsParser;

实现过程

当调用MongoClient::createShared()MongoClient::createNonShared()方法创建mongo的客户端时,最终都会调用到MongoClientImpl的构造函数。

 public MongoClientImpl(Vertx vertx, JsonObject config, String dataSourceName) {
    Objects.requireNonNull(vertx);
    Objects.requireNonNull(config);
    Objects.requireNonNull(dataSourceName);
    this.vertx = vertx;
    // 检查或创建新的MongHolder
    this.holder = lookupHolder(dataSourceName, config);
    this.mongo = holder.mongo();
    this.useObjectId = config.getBoolean("useObjectId", false);
  }

如果是通过createNonShared方法创建client时,这里传入的dataSourceName是一个UUID。当使用createShared创建client,会在lookupHolder方法中检查是否已经创建了同名的客户端,否则新建。

下图是检查数据源的过程。会根据传入的 datasourceName vertx 实例的共享数据实例(io.vertx.core.shareddata.SharedData)中获取同名的 MongoHolder 实例。

 private MongoHolder lookupHolder(String datasourceName, JsonObject config) {
    synchronized (vertx) {
      // 获取共享数据实例中的map
      LocalMap<String, MongoHolder> map = vertx.sharedData().getLocalMap(DS_LOCAL_MAP_NAME);

      // 检查datasourceName对应的MongoHolder 是否存在
      MongoHolder theHolder = map.get(datasourceName);

      // 不存在则新构建,并将构建的结果放入sharedData的map中
      if (theHolder == null) {
        theHolder = new MongoHolder(config, () -> removeFromMap(map, datasourceName));
        map.put(datasourceName, theHolder);
      } else {
        // 递增被引用的计数
        theHolder.incRefCount();
      }
      return theHolder;
    }
  }

如果实例不存在,则会创建新的 MongoHolder 实例。个人认为这里有个很不完美的地方是为了解决懒汉模式的问题,增加了一个线程锁。在高并发请求数据库连接资源时,这里会有阻塞。因此我在自己的实现类中存储了MongoClient的实例。不过这个线程锁可以有效减少数据库连接池的爆发式增长,在数据库连接池资源较少的情况下,有不错的效果(比如我们某个项目使用了阿里云的mongDB,最低配置只有200个连接)

下图是 MongoHolder 的构造方法。

public MongoHolder(JsonObject config, Runnable closeRunner) {
      this.config = config;
      this.closeRunner = closeRunner;
    }

    MongoHolder 构造方法只是简单的设置了成员变量 config closeRunner 的值。closeRunner当调用MongoClient::close()方法时,用于回调销毁SharedData::localMap中的对应索引数据。config 是用户传入的配置参数,需要注意的是,config 传递到这里一直没有被改变。

创建 MongoHolder 的实例成功后,接下来会调用 MongoHolder::mongo() 来创建一个真正 com.mongodb.async.client.MongoClient 实例。这里同样使用了懒汉模式,存在线程锁,如果MongoClient的实例存在直接返回,如果不存在,则新建一个实例。

synchronized com.mongodb.async.client.MongoClient mongo() {
      if (mongo == null) {
        // 解析外部传递的config
        MongoClientOptionsParser parser = new MongoClientOptionsParser(config);

        // 将解析结果用于创建新的com.mongodb.async.client.MongoClient实例
        mongo = MongoClients.create(parser.settings());
        String dbName = config.getString("db_name", DEFAULT_DB_NAME);
        db = mongo.getDatabase(dbName);
      }
      return mongo;
    }

    MongoClientOptionsParser 对象是创建Mongo客户端的关键,他会解析用户传递的参数来创建mongDB客户端,理解他的解析方法有利于创建合适的客户端。

    MongoClientOptionsParser 的构造方法共有60行,这里分几段说明。为了更好的理解创建过程,建议了解下MongoDB异步Java驱动。下面的构造客户端参数的第一部分。

 public MongoClientOptionsParser(JsonObject config) {
    Objects.requireNonNull(config);
    
    // 创建mongoDB的构建对象
    MongoClientSettings.Builder options = MongoClientSettings.builder();

    // 注册对象数据的存储规则
    options.codecRegistry(CodecRegistries.fromRegistries(commonCodecRegistry, CodecRegistries.fromCodecs(new JsonObjectCodec(config))));

    // 获取连接串,所有的定义参数都来自连接串
    String cs = config.getString("connection_string");

    // 解析连接串
    ConnectionString connectionString = (cs == null) ? null : new ConnectionString(cs);

    // 解析集群参数
    ClusterSettings clusterSettings = new ClusterSettingsParser(connectionString, config).settings();
    options.clusterSettings(clusterSettings);

    // 解析连接池参数
    ConnectionPoolSettings connectionPoolSettings = new ConnectionPoolSettingsParser(connectionString, config).settings();
    options.connectionPoolSettings(connectionPoolSettings);

    // some code
}

首先创建 MongoClientSettings 的构造对象。

然后根据传递的参数构建 CodecRegistry 实例。CodecRegistry 的说明见 mongDB官网CodecRegistry的API说明CodecRegistry 用于指定相关的对象在mongoDB的读写实现类,例如官方已经源生实现了 StringCodec、IntegerCodec来处理Java的String、Integer对象

这段代码的最后部分,创建一个 ConnectionString 实例来分解和存储连接串的解析结果。ConnectionString 是 mongoDB 官方实现的解析连接串参数方法可以将http协议串解析成对应的初始化参数,例如设置连接池最小连接数为20,最大连接数为200: mongodb://host:27017/?minPoolSize=20&maxPoolSize=200 。详细说明见 ConnectionString 的API文档  和 mongoDB官方指引手册 。

下面的代码是 ClusterSettingsParser 对传入的数据进行解析,vertx-mongdb解析连接参数都是采用类似的思路:优先使用mongodb源生连接串中指定的参数,如果参数不存在,则使用用户传入的参数。因此,在我们设计mongodb的连接参数时,可以在传入的JsonObject实例中统一在key="connection_string"的参数中一次性制定mongdb风格的连接字符串,还可以在这个实例中通过key值设置vertx风格的各种连接参数。如果2个参数都存在,则优先使用连接字符串。

public ClusterSettingsParser(ConnectionString connectionString, JsonObject config) {

    // 创建mongdb集群builder方法
    ClusterSettings.Builder settings = ClusterSettings.builder();

    // 优先从连接字符串中使用mongdb源生方法解析相关参数 
    if (connectionString != null) {
      settings.applyConnectionString(connectionString);
    } else {
      // 如果连接字符串中相关的参数不存在,则从用户传入的config中提取指定的数据
      // 设置host列表
      // 在parseHosts中优先解析config是否存在包含key=hosts的JsonArray实例,如果有则会即系多个连接服务器
      // 如果没有key=hosts,则解析host和port是否存在
      List<ServerAddress> hosts = parseHosts(config);
      settings.hosts(hosts);

      // 设置mongdb的运行模式和replica模式
      String replicaSet = config.getString("replicaSet");
      if (hosts.size() == 1 && replicaSet == null) {
        settings.mode(ClusterConnectionMode.SINGLE);
      } else {
        settings.mode(ClusterConnectionMode.MULTIPLE);
      }
      if (replicaSet != null) {
        settings.requiredReplicaSetName(replicaSet);
      }
    }

    this.settings = settings.build();
  }

这里就不一一说明每一个解析方法,基本上都是一样的套路。

解析完连接参数后,用这些参数直接调用MongoClients::create来创建mongdb的客户端实例。然后从客户端从获取mongodb的连接。

总结

至此,mongdb的创建过程完毕。在创建的过程中,可以实现mongdb源生的连接串,也可以使用vertx风格的JsonObject。mongdb自身已经实现了全异步接口,因此vertx-mongdb只是在此基础上进行了一层封装。下面的附表是vertx-mongdb相关的设置参数。可以在建立vertx-mongdb实例时,通过JsonObject传入。

{
  // 设置单个mongdb服务时使用host、port指定主机和端口
  "host" : "17.0.0.1", // string --mongdb实例所在的地址
  "port" : 27017,      // int --mongdb实例的端口号

  // 设置集群mongdb服务器时使用队列
  "hosts" : [
    {
      "host" : "cluster1", // string --集群1地址
      "port" : 27000       // int --集群1端口号
    },
    {
      "host" : "cluster2", // string --集群2地址
      "port" : 28000       // int --集群2端口号
    },
    ...
  ],

  // 数据库分布式方法
  "replicaSet" :  "foo"    // string

  // 连接池参数
  "maxPoolSize" : 100,                // int --最大连接数
  "minPoolSize" : 0,                // int --最小连接数
  "maxIdleTimeMS" : 0,          // long --单个连接空闲释放时间,0时表示没有时间限制
  "maxLifeTimeMS" : 0,         // long --单个连接最大存活时间,0时表示灭有时间限制
  "waitQueueMultiple"  : 500,         // int --等待获取连接的排队队列最大数量。
  "waitQueueTimeoutMS" : 120000,      // long --等待获取连接的最大等待时间。
  "maintenanceFrequencyMS" : 0,   // long
  "maintenanceInitialDelayMS" : 0, // long

  // 账户、密码、连接信息
  "username"   : "john",     // string
  "password"   : "passw0rd", // string
  "authSource" : "some.db"   // string
  "authMechanism"     : "GSSAPI",        // string --认证机制相关配置,详情见http://docs.mongodb.org/manual/core/authentication/
  "gssapiServiceName" : "myservicename", // string --Kerberos单点登录相关接口API配置。

  // 联网相关的配置
  "connectTimeoutMS" : 10000 , // int // --连接到mongdb数据库实例返回的等待时间
  "socketTimeoutMS"  : 0,    // int // --通过socket完成数据库相关操作的等待与返回时间,0时表示没有限制。
  "sendBufferSize"    : 0,  // int // --设置通过socket发送数据的缓存大小,0时表示使用操作系统默认值。
  "receiveBufferSize" : 0,  // int --设置通过socket获取数据的缓存大小,0时表示使用操作系统默认值。
  "keepAlive" : false       // boolean --设置是否保持数据库连接,默认为false

  // 设置集群之间的心跳配置
  "heartbeat.socket" : {
  "connectTimeoutMS" : 300000, // int 
  "socketTimeoutMS"  : 100000, // int
  "sendBufferSize"    : 8192,  // int
  "receiveBufferSize" : 8192,  // int
  "keepAlive" : true           // boolean
  }

  // 设置客户端和mongdb实例的心跳测试
  "heartbeatFrequencyMS" :    5000 // long 集群监视器监控到达每个mongdb实例的心跳频率
  "minHeartbeatFrequencyMS" : 1000 // long 当前客户端到服务器的监控频率
}