Formats des données d'entrée et de sortie pour les modèles Decision Optimization Python DOcplex
Vous pouvez utiliser les formats de données d'entrée et de sortie suivants pour les modèles Python Decision Optimization.
Données d'entrée
Pour les modèles Python DOcplex, les données d'entrée peuvent être lues à partir d'un fichier dans n'importe quel format. Si vous souhaitez utiliser des sources externes pour alimenter vos données d'entrée à l'aide de connecteurs, vous devez utiliser des fichiers
pour vos données d'entrée..csv
get_all_inputs
pour lire des fichiers et renvoyer un dictionnaire d'images de données.from docplex.util.environment import get_environment import pandas from six import iteritems from collections.abc import Mapping from os.path import join, dirname, basename, splitext, exists import glob class _InputDict(dict): def __init__(self, directory, names): dict.__init__(self) self._directory = directory for k in names: dict.__setitem__(self, k, None) file='model_schema.json' if self._directory is not None: file = "{0}/".format(self._directory) + file self.dtype_schemas = self.get_dtype_schemas( file) def __getitem__(self, key): if isinstance(key, str): item = dict.__getitem__(self, key) if item is None: file = "{0}.csv".format(key) if file in self.dtype_schemas: return self.read_df( key, dtype=self.dtype_schemas[file]) else: return self.read_df( key) else: return item else: raise Exception("Accessing input dict via non string index") def read_df(self, key, **kwargs): env = get_environment() file = "{0}.csv".format(key) if self._directory is not None: file = "{0}/".format(self._directory) + file with env.get_input_stream(file) as ist: params = {'encoding': 'utf8'} if kwargs: params.update(kwargs) df = pandas.read_csv( ist, **params) dict.__setitem__(self, key, df) return df def get_dtype_schemas(self, path): dtype_schemas = {} if exists(path): input_schemas=json.load(open(path)) if 'input' in input_schemas: for input_schema in input_schemas['input']: dtype_schema = {} if 'fields' in input_schema: for input_schema_field in input_schema['fields']: if input_schema_field['type']=='string': dtype_schema[input_schema_field['name']]='str' if len(dtype_schema) > 0: dtype_schemas[input_schema['id']]=dtype_schema print(dtype_schemas) return dtype_schemas class _LazyDict(Mapping): def __init__(self, *args, **kw): self._raw_dict = _InputDict(*args, **kw) def __getitem__(self, key): return self._raw_dict.__getitem__(key) def __iter__(self): return iter(self._raw_dict) def __len__(self): return len(self._raw_dict) def read_df(self, key, **kwargs): return self._raw_dict.read_df(key, **kwargs) def get_all_inputs(directory=None): '''Utility method to read a list of files and return a tuple with all read data frames. Returns: a map { datasetname: data frame } ''' all_csv = "*.csv" g = join(directory, all_csv) if directory else all_csv names = [splitext(basename(f))[0] for f in glob.glob(g)] result = _LazyDict(directory, names) return result
Données de sortie
def write_all_outputs(outputs): '''Write all dataframes in ``outputs`` as .csv. Args: outputs: The map of outputs 'outputname' -> 'output df' ''' for (name, df) in iteritems(outputs): csv_file = '%s.csv' % name print(csv_file) with get_environment().get_output_stream(csv_file) as fp: if sys.version_info[0] < 3: fp.write(df.to_csv(index=False, encoding='utf8')) else: fp.write(df.to_csv(index=False).encode(encoding='utf8')) if len(outputs) == 0: print("Warning: no outputs written")
Vous pouvez également utiliser le
pour enregistrer vos sorties.get_environment().get_output_stream(csv_file) as fp:
Pour un exemple complet de déploiement d'un modèle Decision Optimization Python DOcplex, voir l'exemple Deploying a DO model with WML situé dans le dossier jupyter du DO-samples. Sélectionnez le sous-dossier correspondant au produit et à la version en question.
Cet exemple peut également être trouvé dans le Cloud Pak for Data Resource hub, voir Déploiement d'un modèle d'Decision Optimization dans watsonx.ai Runtime.